Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
6ead46d
Add metrics and logging
JaySon-Huang Mar 21, 2026
66a18b4
metrics: Update grafana panel
JaySon-Huang Mar 21, 2026
14140e0
Add debug logging
JaySon-Huang Mar 21, 2026
40ac415
tests: add write-group remote apply behavior coverage
JaySon-Huang Mar 21, 2026
5856c85
fix(page): return writer-scoped applied remote lock ids
JaySon-Huang Mar 21, 2026
d2b5ace
cleanup(page): remove investigation logs and keep actionable signals
JaySon-Huang Mar 22, 2026
f9b1b68
refactor(page): simplify remote write and lock cleanup logging
JaySon-Huang Mar 22, 2026
9aad725
docs: refine exception and logging guidance
JaySon-Huang Mar 22, 2026
998eb35
refactor(page): centralize remote write count handling
JaySon-Huang Mar 22, 2026
5a1e219
fix(page): cleanup pre-lock keys on write failure
JaySon-Huang Mar 22, 2026
83df9b1
tests(page): document purposes and steps for lock cleanup cases
JaySon-Huang Mar 22, 2026
98d0036
tests(page): force async launch for syncpoint concurrency cases
JaySon-Huang Mar 22, 2026
b360e87
fix(page): avoid partial pre-lock residue on lock creation
JaySon-Huang Mar 22, 2026
7f0b357
tests(page): cover lock return semantics and partial failure
JaySon-Huang Mar 22, 2026
bda0e5a
tests(page): enforce async launch in write-group syncpoint tests
JaySon-Huang Mar 22, 2026
48d3aa7
Format codes
JaySon-Huang Mar 22, 2026
ed82d79
enhance(s3gc): add owner-only periodic storage summary task
JaySon-Huang Mar 22, 2026
7b09efe
metrics: Update grafana panel about tiflash_storage_s3_store_summary_…
JaySon-Huang Mar 22, 2026
b007911
Revert "enhance(s3gc): add owner-only periodic storage summary task"
JaySon-Huang Mar 23, 2026
7393170
Revert the changes in grafana
JaySon-Huang Mar 23, 2026
c71feec
Address comment
JaySon-Huang Mar 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ TiFlash follows a style based on Google, enforced by `clang-format` 17.0.0+.
- **Smart Pointers:** Prefer `std::shared_ptr` and `std::unique_ptr`. Use `std::make_shared` and `std::make_unique`.
- **Error Handling:**
- Use `DB::Exception`.
- Pattern: `throw Exception("Message", ErrorCodes::SOME_CODE);`
- Prefer the fmt-style constructor with error code first: `throw Exception(ErrorCodes::SOME_CODE, "Message with {}", arg);`
- For fixed strings without formatting, `throw Exception("Message", ErrorCodes::SOME_CODE);` is still acceptable.
- Error codes are defined in `dbms/src/Common/ErrorCodes.cpp` and `errors.toml`.
- In broad `catch (...)` paths, prefer `tryLogCurrentException(log, "context")` to avoid duplicated exception-formatting code.
- **Logging:** Use macros like `LOG_INFO(log, "message {}", arg)`. `log` is usually a `DB::LoggerPtr`.
- When only log level differs by runtime condition, prefer `LOG_IMPL(log, level, ...)` (with `Poco::Message::Priority`) instead of duplicated `if/else` log blocks.

### Modern C++ Practices
- Prefer `auto` for complex iterators/templates.
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,15 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_to_finished, {"type", "to_finished"}), \
F(type_to_error, {"type", "to_error"}), \
F(type_to_cancelled, {"type", "to_cancelled"})) \
M(tiflash_storage_s3_lock_mgr_status, "S3 Lock Manager", Gauge, F(type_prelock_keys, {{"type", "prelock_keys"}})) \
M(tiflash_storage_s3_lock_mgr_counter, \
"S3 Lock Manager Counter", \
Counter, \
F(type_create_lock_local, {{"type", "create_lock_local"}}), \
F(type_create_lock_ingest, {{"type", "create_lock_ingest"}}), \
F(type_clean_lock, {{"type", "clean_lock"}}), \
F(type_clean_lock_erase_hit, {{"type", "clean_lock_erase_hit"}}), \
F(type_clean_lock_erase_miss, {{"type", "clean_lock_erase_miss"}})) \
M(tiflash_storage_s3_gc_status, \
"S3 GC status", \
Gauge, \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,10 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(

if (!isSegmentValid(lock, segment))
{
LOG_INFO(
log,
"MergeDelta - Give up segmentMergeDelta because segment not valid, reason=concurrent_update segment={}",
segment->simpleInfo());
LOG_DEBUG(
log,
"MergeDelta - Give up segmentMergeDelta because segment not valid, segment={}",
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/DeltaMerge/WriteBatchesImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ struct WriteBatches : private boost::noncopyable

void rollbackWrittenLogAndData()
{
LOG_INFO(
Logger::get("WriteBatches"),
"Rollback written log/data, run_mode={} written_log_pages={} written_data_pages={}",
magic_enum::enum_name(run_mode),
written_log.size(),
written_data.size());
WriteBatchWrapper log_wb(run_mode, keyspace_id, StorageType::Log, ns_id);
for (auto p : written_log)
log_wb.delPage(p);
Expand Down
25 changes: 14 additions & 11 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <shared_mutex>
#include <type_traits>
#include <utility>
#include <vector>


#ifdef FIU_ENABLE
Expand Down Expand Up @@ -1612,6 +1613,13 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
CurrentMetrics::Increment pending_writer_size{CurrentMetrics::PSPendingWriterNum};
Writer w;
w.edit = &edit;
// Capture this writer's checkpoint data_file_ids before write-group merge.
// Followers' edit objects are cleared by the owner during merge.
for (const auto & r : edit.getRecords())
{
if (r.entry.checkpoint_info.has_value())
w.applied_data_files.emplace(*r.entry.checkpoint_info.data_location.data_file_id);
}

Stopwatch watch;
std::unique_lock apply_lock(apply_mutex);
Expand Down Expand Up @@ -1639,9 +1647,9 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown exception");
}
}
// the `applied_data_files` will be returned by the write
// group owner, others just return an empty set.
return {};
// Return per-writer ids instead of merged-group ids, so upper-layer
// lock cleanup can always clean locks created by this writer.
return std::move(w.applied_data_files);
}

/// This thread now is the write group owner, build the group. It will merge the
Expand Down Expand Up @@ -1703,7 +1711,6 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
});

SYNC_FOR("before_PageDirectory::apply_to_memory");
std::unordered_set<String> applied_data_files;
{
std::unique_lock table_lock(table_rw_mutex);

Expand Down Expand Up @@ -1775,12 +1782,6 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
"should not handle edit with invalid type, type={}",
magic_enum::enum_name(r.type));
}

// collect the applied remote data_file_ids
if (r.entry.checkpoint_info.has_value())
{
applied_data_files.emplace(*r.entry.checkpoint_info.data_location.data_file_id);
}
}
catch (DB::Exception & e)
{
Expand All @@ -1800,7 +1801,9 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
}

success = true;
return applied_data_files;
// Even for write-group owner, return only this writer's pre-captured ids.
// Other writers return their own ids in the `w.done` branch above.
return std::move(w.applied_data_files);
}

template <typename Trait>
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,9 @@ class PageDirectory
struct Writer
{
PageEntriesEdit * edit;
// Keep per-writer checkpoint lock keys before write-group merge so
// followers can still return their own applied ids for lock cleanup.
std::unordered_set<String> applied_data_files;
bool done = false; // The work has been performed by other thread
bool success = false; // The work complete successfully
std::unique_ptr<DB::Exception> exception;
Expand Down
112 changes: 95 additions & 17 deletions dbms/src/Storages/Page/V3/Universal/S3LockLocalManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

#include <Common/Exception.h>
#include <Common/Stopwatch.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/Disaggregated/S3LockClient.h>
#include <Poco/Message.h>
#include <Storages/Page/V3/CheckpointFile/CPManifestFileReader.h>
#include <Storages/Page/V3/CheckpointFile/Proto/manifest_file.pb.h>
#include <Storages/Page/V3/Universal/S3LockLocalManager.h>
Expand All @@ -23,6 +25,7 @@
#include <Storages/S3/S3Common.h>
#include <Storages/S3/S3Filename.h>
#include <Storages/S3/S3RandomAccessFile.h>
#include <common/logger_useful.h>

#include <magic_enum.hpp>

Expand All @@ -40,7 +43,7 @@ S3LockLocalManager::S3LockLocalManager()
{}

// `store_id` is inited later because they may not
// accessable when S3LockLocalManager is created.
// accessible when S3LockLocalManager is created.
std::optional<CheckpointProto::ManifestFilePrefix> S3LockLocalManager::initStoreInfo(
StoreID actual_store_id,
DB::S3::S3LockClientPtr s3lock_client_,
Expand Down Expand Up @@ -137,7 +140,7 @@ S3LockLocalManager::ExtraLockInfo S3LockLocalManager::allocateNewUploadLocksInfo
};
}

void S3LockLocalManager::createS3LockForWriteBatch(UniversalWriteBatch & write_batch)
std::unordered_set<String> S3LockLocalManager::createS3LockForWriteBatch(UniversalWriteBatch & write_batch)
{
waitUntilInited();

Expand All @@ -162,6 +165,10 @@ void S3LockLocalManager::createS3LockForWriteBatch(UniversalWriteBatch & write_b
}
}

// If there are multiple data files need to create locks but only partially created, the
// created "locks" will be cleaned up by S3GCManager because `pre_lock_keys` does not contain
// the keys that are only partially created.
std::vector<String> lock_keys_to_append;
for (auto & [input_key, lock_key] : s3_datafiles_to_lock)
{
auto view = S3::S3FilenameView::fromKey(input_key);
Expand All @@ -170,13 +177,35 @@ void S3LockLocalManager::createS3LockForWriteBatch(UniversalWriteBatch & write_b
"invalid data_file_id, input_key={} type={}",
input_key,
magic_enum::enum_name(view.type));
// Already a lock file, which means the data file has been locked. This can happen when
// FAP apply a write batch with pages reference a file that is already uploaded. Just
// reuse the existing lock file
if (view.isLockFile())
{
lock_key = std::make_shared<String>(input_key);
continue;
}
// Only a data file, we need to create a lock file for it.
auto lock_result = createS3Lock(input_key, view, store_id);
lock_key = std::make_shared<String>(lock_result);
lock_keys_to_append.push_back(lock_result);
}

{
// The related S3 data files in write batch is not applied into PageDirectory,
// but we need to ensure they exist in the next manifest file so that these
// S3 data files will not be deleted by the S3GCManager.
// Add the lock file key to `pre_locks_files` for manifest uploading.
std::unique_lock wlatch_keys(mtx_lock_keys);
for (const auto & lock_key : lock_keys_to_append)
{
const auto [_, inserted] = pre_lock_keys.emplace(lock_key);
if (!inserted)
{
LOG_WARNING(log, "Duplicate pre-lock key detected, lockkey={} lock_store_id={}", lock_key, store_id);
}
}
GET_METRIC(tiflash_storage_s3_lock_mgr_status, type_prelock_keys).Set(pre_lock_keys.size());
}

for (auto & w : write_batch.getMutWrites())
Expand All @@ -200,8 +229,13 @@ void S3LockLocalManager::createS3LockForWriteBatch(UniversalWriteBatch & write_b
break;
}
}

// Return only the lock keys newly appended into `pre_lock_keys`.
// Existing lock-file inputs are intentionally excluded.
return std::unordered_set<String>(lock_keys_to_append.begin(), lock_keys_to_append.end());
}

// If any "lock" failed to be created, this function will throw exception.
String S3LockLocalManager::createS3Lock(
const String & datafile_key,
const S3::S3FilenameView & s3_file,
Expand All @@ -224,7 +258,8 @@ String S3LockLocalManager::createS3Lock(
// TODO: handle s3 network error and retry?
auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
S3::uploadEmptyFile(*s3_client, lockkey);
LOG_DEBUG(log, "S3 lock created for local datafile, lockkey={}", lockkey);
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_create_lock_local).Increment();
LOG_DEBUG(log, "S3 lock created for local datafile, datafile_key={} lockkey={}", datafile_key, lockkey);
}
else
{
Expand All @@ -237,29 +272,72 @@ String S3LockLocalManager::createS3Lock(
{
throw Exception(ErrorCodes::S3_LOCK_CONFLICT, err_msg);
}
LOG_DEBUG(log, "S3 lock created for ingest datafile, lockkey={}", lockkey);
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_create_lock_ingest).Increment();
LOG_DEBUG(log, "S3 lock created for ingest datafile, datafile_key={} lockkey={}", datafile_key, lockkey);
}

// The related S3 data files in write batch is not applied into PageDirectory,
// but we need to ensure they exist in the next manifest file so that these
// S3 data files will not be deleted by the S3GCManager.
// Add the lock file key to `pre_locks_files` for manifest uploading.
return lockkey;
}

std::tuple<std::size_t, std::size_t, std::size_t> S3LockLocalManager::cleanPreLockKeysImpl(
const std::unordered_set<String> & lock_keys_to_clean)
{
size_t erase_hit = 0;
size_t erase_miss = 0;
size_t remaining_pre_lock_keys = 0;
{
// After the entries applied into PageDirectory, manifest can get the S3 lock key
// from `VersionedPageEntries`, cleanup the pre lock files.
std::unique_lock wlatch_keys(mtx_lock_keys);
pre_lock_keys.emplace(lockkey);
for (const auto & file : lock_keys_to_clean)
{
if (pre_lock_keys.erase(file) > 0)
{
++erase_hit;
}
else
{
++erase_miss;
}
}
remaining_pre_lock_keys = pre_lock_keys.size();
GET_METRIC(tiflash_storage_s3_lock_mgr_status, type_prelock_keys).Set(remaining_pre_lock_keys);
}
return lockkey;
return {erase_hit, erase_miss, remaining_pre_lock_keys};
}

void S3LockLocalManager::cleanAppliedS3ExternalFiles(std::unordered_set<String> && applied_s3files)
{
// After the entries applied into PageDirectory, manifest can get the S3 lock key
// from `VersionedPageEntries`, cleanup the pre lock files.
std::unique_lock wlatch_keys(mtx_lock_keys);
for (const auto & file : applied_s3files)
{
pre_lock_keys.erase(file);
}
auto [erase_hit, erase_miss, remaining_pre_lock_keys] = cleanPreLockKeysImpl(applied_s3files);
const auto log_lvl = erase_miss > 0 ? Poco::Message::PRIO_WARNING : Poco::Message::PRIO_DEBUG;
LOG_IMPL(
log,
log_lvl,
"Clean applied S3 external files, applied_count={} erase_hit={} erase_miss={} remaining_pre_lock_keys={}",
applied_s3files.size(),
erase_hit,
erase_miss,
remaining_pre_lock_keys);
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock).Increment();
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock_erase_hit).Increment(erase_hit);
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock_erase_miss).Increment(erase_miss);
}

void S3LockLocalManager::cleanPreLockKeysOnWriteFailure(std::unordered_set<String> && pre_lock_keys_on_failure)
{
auto [erase_hit, erase_miss, remaining_pre_lock_keys] = cleanPreLockKeysImpl(pre_lock_keys_on_failure);
const auto log_lvl = erase_miss > 0 ? Poco::Message::PRIO_WARNING : Poco::Message::PRIO_DEBUG;
LOG_IMPL(
log,
log_lvl,
"Clean pre-lock keys on write failure, requested={} erase_hit={} erase_miss={} remaining_pre_lock_keys={}",
pre_lock_keys_on_failure.size(),
erase_hit,
erase_miss,
remaining_pre_lock_keys);
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock).Increment();
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock_erase_hit).Increment(erase_hit);
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock_erase_miss).Increment(erase_miss);
}

} // namespace DB::PS::V3
10 changes: 9 additions & 1 deletion dbms/src/Storages/Page/V3/Universal/S3LockLocalManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <Storages/S3/S3Filename.h>
#include <aws/s3/S3Client.h>

#include <tuple>


namespace DB::S3
{
Expand Down Expand Up @@ -59,15 +61,21 @@ class S3LockLocalManager
};
ExtraLockInfo allocateNewUploadLocksInfo();

void createS3LockForWriteBatch(UniversalWriteBatch & write_batch);
std::unordered_set<String> createS3LockForWriteBatch(UniversalWriteBatch & write_batch);

// after write batch applied, we can clean the applied locks from `pre_locks_files`
void cleanAppliedS3ExternalFiles(std::unordered_set<String> && applied_s3files);

// If write fails after creating pre-locks, clean these pre-lock keys to avoid residual entries.
void cleanPreLockKeysOnWriteFailure(std::unordered_set<String> && pre_lock_keys_on_failure);

DISALLOW_COPY_AND_MOVE(S3LockLocalManager);


private:
std::tuple<std::size_t, std::size_t, std::size_t>
cleanPreLockKeysImpl(const std::unordered_set<String> & lock_keys_to_clean);

// return the s3 lock_key
String createS3Lock(const String & datafile_key, const S3::S3FilenameView & s3_file, UInt64 lock_store_id);

Expand Down
Loading