Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
6ead46d
Add metrics and logging
JaySon-Huang Mar 21, 2026
66a18b4
metrics: Update grafana panel
JaySon-Huang Mar 21, 2026
14140e0
Add debug logging
JaySon-Huang Mar 21, 2026
40ac415
tests: add write-group remote apply behavior coverage
JaySon-Huang Mar 21, 2026
5856c85
fix(page): return writer-scoped applied remote lock ids
JaySon-Huang Mar 21, 2026
d2b5ace
cleanup(page): remove investigation logs and keep actionable signals
JaySon-Huang Mar 22, 2026
f9b1b68
refactor(page): simplify remote write and lock cleanup logging
JaySon-Huang Mar 22, 2026
9aad725
docs: refine exception and logging guidance
JaySon-Huang Mar 22, 2026
998eb35
refactor(page): centralize remote write count handling
JaySon-Huang Mar 22, 2026
5a1e219
fix(page): cleanup pre-lock keys on write failure
JaySon-Huang Mar 22, 2026
83df9b1
tests(page): document purposes and steps for lock cleanup cases
JaySon-Huang Mar 22, 2026
98d0036
tests(page): force async launch for syncpoint concurrency cases
JaySon-Huang Mar 22, 2026
b360e87
fix(page): avoid partial pre-lock residue on lock creation
JaySon-Huang Mar 22, 2026
7f0b357
tests(page): cover lock return semantics and partial failure
JaySon-Huang Mar 22, 2026
bda0e5a
tests(page): enforce async launch in write-group syncpoint tests
JaySon-Huang Mar 22, 2026
48d3aa7
Format codes
JaySon-Huang Mar 22, 2026
ed82d79
enhance(s3gc): add owner-only periodic storage summary task
JaySon-Huang Mar 22, 2026
7b09efe
metrics: Update grafana panel about tiflash_storage_s3_store_summary_…
JaySon-Huang Mar 22, 2026
b007911
Revert "enhance(s3gc): add owner-only periodic storage summary task"
JaySon-Huang Mar 23, 2026
7393170
Revert the changes in grafana
JaySon-Huang Mar 23, 2026
c71feec
Address comment
JaySon-Huang Mar 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ TiFlash follows a style based on Google, enforced by `clang-format` 17.0.0+.
- **Smart Pointers:** Prefer `std::shared_ptr` and `std::unique_ptr`. Use `std::make_shared` and `std::make_unique`.
- **Error Handling:**
- Use `DB::Exception`.
- Pattern: `throw Exception("Message", ErrorCodes::SOME_CODE);`
- Prefer the fmt-style constructor with error code first: `throw Exception(ErrorCodes::SOME_CODE, "Message with {}", arg);`
- For fixed strings without formatting, `throw Exception("Message", ErrorCodes::SOME_CODE);` is still acceptable.
- Error codes are defined in `dbms/src/Common/ErrorCodes.cpp` and `errors.toml`.
- In broad `catch (...)` paths, prefer `tryLogCurrentException(log, "context")` to avoid duplicated exception-formatting code.
- **Logging:** Use macros like `LOG_INFO(log, "message {}", arg)`. `log` is usually a `DB::LoggerPtr`.
- When only log level differs by runtime condition, prefer `LOG_IMPL(log, level, ...)` (with `Poco::Message::Priority`) instead of duplicated `if/else` log blocks.

### Modern C++ Practices
- Prefer `auto` for complex iterators/templates.
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,15 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_to_finished, {"type", "to_finished"}), \
F(type_to_error, {"type", "to_error"}), \
F(type_to_cancelled, {"type", "to_cancelled"})) \
M(tiflash_storage_s3_lock_mgr_status, "S3 Lock Manager", Gauge, F(type_prelock_keys, {{"type", "prelock_keys"}})) \
M(tiflash_storage_s3_lock_mgr_counter, \
"S3 Lock Manager Counter", \
Counter, \
F(type_create_lock_local, {{"type", "create_lock_local"}}), \
F(type_create_lock_ingest, {{"type", "create_lock_ingest"}}), \
F(type_clean_lock, {{"type", "clean_lock"}}), \
F(type_clean_lock_erase_hit, {{"type", "clean_lock_erase_hit"}}), \
F(type_clean_lock_erase_miss, {{"type", "clean_lock_erase_miss"}})) \
M(tiflash_storage_s3_gc_status, \
"S3 GC status", \
Gauge, \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,10 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(

if (!isSegmentValid(lock, segment))
{
LOG_INFO(
log,
"MergeDelta - Give up segmentMergeDelta because segment not valid, reason=concurrent_update segment={}",
segment->simpleInfo());
LOG_DEBUG(
log,
"MergeDelta - Give up segmentMergeDelta because segment not valid, segment={}",
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/DeltaMerge/WriteBatchesImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ struct WriteBatches : private boost::noncopyable

void rollbackWrittenLogAndData()
{
LOG_INFO(
Logger::get("WriteBatches"),
"Rollback written log/data, run_mode={} written_log_pages={} written_data_pages={}",
magic_enum::enum_name(run_mode),
written_log.size(),
written_data.size());
WriteBatchWrapper log_wb(run_mode, keyspace_id, StorageType::Log, ns_id);
for (auto p : written_log)
log_wb.delPage(p);
Expand Down
25 changes: 14 additions & 11 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <shared_mutex>
#include <type_traits>
#include <utility>
#include <vector>


#ifdef FIU_ENABLE
Expand Down Expand Up @@ -1612,6 +1613,13 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
CurrentMetrics::Increment pending_writer_size{CurrentMetrics::PSPendingWriterNum};
Writer w;
w.edit = &edit;
// Capture this writer's checkpoint data_file_ids before write-group merge.
// Followers' edit objects are cleared by the owner during merge.
for (const auto & r : edit.getRecords())
{
if (r.entry.checkpoint_info.has_value())
w.applied_data_files.emplace(*r.entry.checkpoint_info.data_location.data_file_id);
}

Stopwatch watch;
std::unique_lock apply_lock(apply_mutex);
Expand Down Expand Up @@ -1639,9 +1647,9 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown exception");
}
}
// the `applied_data_files` will be returned by the write
// group owner, others just return an empty set.
return {};
// Return per-writer ids instead of merged-group ids, so upper-layer
// lock cleanup can always clean locks created by this writer.
return std::move(w.applied_data_files);
}

/// This thread now is the write group owner, build the group. It will merge the
Expand Down Expand Up @@ -1703,7 +1711,6 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
});

SYNC_FOR("before_PageDirectory::apply_to_memory");
std::unordered_set<String> applied_data_files;
{
std::unique_lock table_lock(table_rw_mutex);

Expand Down Expand Up @@ -1775,12 +1782,6 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
"should not handle edit with invalid type, type={}",
magic_enum::enum_name(r.type));
}

// collect the applied remote data_file_ids
if (r.entry.checkpoint_info.has_value())
{
applied_data_files.emplace(*r.entry.checkpoint_info.data_location.data_file_id);
}
}
catch (DB::Exception & e)
{
Expand All @@ -1800,7 +1801,9 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
}

success = true;
return applied_data_files;
// Even for write-group owner, return only this writer's pre-captured ids.
// Other writers return their own ids in the `w.done` branch above.
return std::move(w.applied_data_files);
}

template <typename Trait>
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,9 @@ class PageDirectory
struct Writer
{
PageEntriesEdit * edit;
// Keep per-writer checkpoint lock keys before write-group merge so
// followers can still return their own applied ids for lock cleanup.
std::unordered_set<String> applied_data_files;
bool done = false; // The work has been performed by other thread
bool success = false; // The work complete successfully
std::unique_ptr<DB::Exception> exception;
Expand Down
74 changes: 65 additions & 9 deletions dbms/src/Storages/Page/V3/Universal/S3LockLocalManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/Exception.h>
#include <Common/Stopwatch.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/Disaggregated/S3LockClient.h>
#include <Storages/Page/V3/CheckpointFile/CPManifestFileReader.h>
#include <Storages/Page/V3/CheckpointFile/Proto/manifest_file.pb.h>
Expand All @@ -23,6 +24,8 @@
#include <Storages/S3/S3Common.h>
#include <Storages/S3/S3Filename.h>
#include <Storages/S3/S3RandomAccessFile.h>
#include <Poco/Message.h>
#include <common/logger_useful.h>

#include <magic_enum.hpp>

Expand All @@ -40,7 +43,7 @@ S3LockLocalManager::S3LockLocalManager()
{}

// `store_id` is inited later because they may not
// accessable when S3LockLocalManager is created.
// accessible when S3LockLocalManager is created.
std::optional<CheckpointProto::ManifestFilePrefix> S3LockLocalManager::initStoreInfo(
StoreID actual_store_id,
DB::S3::S3LockClientPtr s3lock_client_,
Expand Down Expand Up @@ -224,7 +227,8 @@ String S3LockLocalManager::createS3Lock(
// TODO: handle s3 network error and retry?
auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
S3::uploadEmptyFile(*s3_client, lockkey);
LOG_DEBUG(log, "S3 lock created for local datafile, lockkey={}", lockkey);
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_create_lock_local).Increment();
LOG_DEBUG(log, "S3 lock created for local datafile, datafile_key={} lockkey={}", datafile_key, lockkey);
}
else
{
Expand All @@ -237,7 +241,8 @@ String S3LockLocalManager::createS3Lock(
{
throw Exception(ErrorCodes::S3_LOCK_CONFLICT, err_msg);
}
LOG_DEBUG(log, "S3 lock created for ingest datafile, lockkey={}", lockkey);
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_create_lock_ingest).Increment();
LOG_DEBUG(log, "S3 lock created for ingest datafile, datafile_key={} lockkey={}", datafile_key, lockkey);
}

// The related S3 data files in write batch is not applied into PageDirectory,
Expand All @@ -247,19 +252,70 @@ String S3LockLocalManager::createS3Lock(
{
std::unique_lock wlatch_keys(mtx_lock_keys);
pre_lock_keys.emplace(lockkey);
GET_METRIC(tiflash_storage_s3_lock_mgr_status, type_prelock_keys).Set(pre_lock_keys.size());
}
return lockkey;
}

void S3LockLocalManager::cleanAppliedS3ExternalFiles(std::unordered_set<String> && applied_s3files)
std::tuple<std::size_t, std::size_t, std::size_t>
S3LockLocalManager::cleanPreLockKeysImpl(const std::unordered_set<String> & lock_keys_to_clean)
{
// After the entries applied into PageDirectory, manifest can get the S3 lock key
// from `VersionedPageEntries`, cleanup the pre lock files.
std::unique_lock wlatch_keys(mtx_lock_keys);
for (const auto & file : applied_s3files)
size_t erase_hit = 0;
size_t erase_miss = 0;
size_t remaining_pre_lock_keys = 0;
{
pre_lock_keys.erase(file);
// After the entries applied into PageDirectory, manifest can get the S3 lock key
// from `VersionedPageEntries`, cleanup the pre lock files.
std::unique_lock wlatch_keys(mtx_lock_keys);
for (const auto & file : lock_keys_to_clean)
{
if (pre_lock_keys.erase(file) > 0)
{
++erase_hit;
}
else
{
++erase_miss;
}
}
remaining_pre_lock_keys = pre_lock_keys.size();
GET_METRIC(tiflash_storage_s3_lock_mgr_status, type_prelock_keys).Set(remaining_pre_lock_keys);
}
return {erase_hit, erase_miss, remaining_pre_lock_keys};
}

void S3LockLocalManager::cleanAppliedS3ExternalFiles(std::unordered_set<String> && applied_s3files)
{
auto [erase_hit, erase_miss, remaining_pre_lock_keys] = cleanPreLockKeysImpl(applied_s3files);
const auto log_lvl = erase_miss > 0 ? Poco::Message::PRIO_WARNING : Poco::Message::PRIO_DEBUG;
LOG_IMPL(
log,
log_lvl,
"Clean applied S3 external files, applied_count={} erase_hit={} erase_miss={} remaining_pre_lock_keys={}",
applied_s3files.size(),
erase_hit,
erase_miss,
remaining_pre_lock_keys);
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock).Increment();
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock_erase_hit).Increment(erase_hit);
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock_erase_miss).Increment(erase_miss);
}

void S3LockLocalManager::cleanPreLockKeysOnWriteFailure(std::unordered_set<String> && pre_lock_keys_on_failure)
{
auto [erase_hit, erase_miss, remaining_pre_lock_keys] = cleanPreLockKeysImpl(pre_lock_keys_on_failure);
const auto log_lvl = erase_miss > 0 ? Poco::Message::PRIO_WARNING : Poco::Message::PRIO_DEBUG;
LOG_IMPL(
log,
log_lvl,
"Clean pre-lock keys on write failure, requested={} erase_hit={} erase_miss={} remaining_pre_lock_keys={}",
pre_lock_keys_on_failure.size(),
erase_hit,
erase_miss,
remaining_pre_lock_keys);
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock).Increment();
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock_erase_hit).Increment(erase_hit);
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock_erase_miss).Increment(erase_miss);
}

} // namespace DB::PS::V3
8 changes: 8 additions & 0 deletions dbms/src/Storages/Page/V3/Universal/S3LockLocalManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <Storages/S3/S3Filename.h>
#include <aws/s3/S3Client.h>

#include <tuple>


namespace DB::S3
{
Expand Down Expand Up @@ -64,10 +66,16 @@ class S3LockLocalManager
// after write batch applied, we can clean the applied locks from `pre_locks_files`
void cleanAppliedS3ExternalFiles(std::unordered_set<String> && applied_s3files);

// If write fails after creating pre-locks, clean these pre-lock keys to avoid residual entries.
void cleanPreLockKeysOnWriteFailure(std::unordered_set<String> && pre_lock_keys_on_failure);

DISALLOW_COPY_AND_MOVE(S3LockLocalManager);


private:
std::tuple<std::size_t, std::size_t, std::size_t>
cleanPreLockKeysImpl(const std::unordered_set<String> & lock_keys_to_clean);

// return the s3 lock_key
String createS3Lock(const String & datafile_key, const S3::S3FilenameView & s3_file, UInt64 lock_store_id);

Expand Down
52 changes: 47 additions & 5 deletions dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <common/logger_useful.h>
#include <fiu.h>

#include <functional>
#include <mutex>

namespace DB
Expand Down Expand Up @@ -109,21 +110,62 @@ void UniversalPageStorage::write(
Stopwatch watch;
SCOPE_EXIT(
{ GET_METRIC(tiflash_storage_page_write_duration_seconds, type_total).Observe(watch.elapsedSeconds()); });
bool has_writes_from_remote = write_batch.hasWritesFromRemote();
if (has_writes_from_remote)
const size_t remote_lock_key_count = write_batch.writesRemoteCount();
std::unordered_set<String> created_pre_lock_keys;
if (remote_lock_key_count > 0)
{
assert(remote_locks_local_mgr != nullptr);
// Before ingesting remote pages/remote external pages, we need to create "lock" on S3
// to ensure the correctness between FAP and S3GC.
// If any "lock" failed to be created, then it will throw exception.
// Note that if `remote_locks_local_mgr`'s store_id is not inited, it will blocks until inited
remote_locks_local_mgr->createS3LockForWriteBatch(write_batch);

// After lock creation, write batch keeps lock keys in data_location.data_file_id.
created_pre_lock_keys.reserve(remote_lock_key_count);
for (const auto & w : write_batch.getWrites())
{
if ((w.type == WriteBatchWriteType::PUT_EXTERNAL || w.type == WriteBatchWriteType::PUT_REMOTE)
&& w.data_location.has_value())
{
created_pre_lock_keys.emplace(*w.data_location->data_file_id);
}
}
}
std::unordered_set<String> applied_lock_ids;
const char * failed_stage = "blob_store->write";
try
{
auto edit = blob_store->write(std::move(write_batch), page_type, write_limiter);
failed_stage = "page_directory->apply";
applied_lock_ids = page_directory->apply(std::move(edit), write_limiter);
}
auto edit = blob_store->write(std::move(write_batch), page_type, write_limiter);
auto applied_lock_ids = page_directory->apply(std::move(edit), write_limiter);
if (has_writes_from_remote)
catch (...)
{
if (remote_lock_key_count > 0)
{
// If write fails after pre-lock creation, clean these pre-lock keys
// to avoid residual entries in checkpoint_manager.pre_lock_keys.
remote_locks_local_mgr->cleanPreLockKeysOnWriteFailure(std::move(created_pre_lock_keys));
tryLogCurrentException(
log,
fmt::format(
"Remote write batch failed, stage={} lock_key_count={}",
failed_stage,
remote_lock_key_count));
}
throw;
}
if (remote_lock_key_count > 0)
{
assert(remote_locks_local_mgr != nullptr);
if (applied_lock_ids.empty())
{
LOG_WARNING(
log,
"Remote write batch has lock keys but no applied lock ids, lock_key_count={}",
remote_lock_key_count);
}
// Remove the applied locks from checkpoint_manager.pre_lock_files
remote_locks_local_mgr->cleanAppliedS3ExternalFiles(std::move(applied_lock_ids));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,11 @@ bool UniversalPageStorageService::uploadCheckpointImpl(
{
LOG_INFO(log, "Upload checkpoint with all existing data");
}
LOG_INFO(
log,
"Checkpoint lock set, upload_sequence={} pre_lock_keys_size={}",
upload_info.upload_sequence,
upload_info.pre_lock_keys.size());
UniversalPageStorage::DumpCheckpointOptions opts{
.data_file_id_pattern = S3::S3Filename::newCheckpointDataNameTemplate(store_info.id(), upload_info.upload_sequence),
.data_file_path_pattern = local_dir_str + "dat_{seq}_{index}",
Expand Down
Loading