Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
6ead46d
Add metrics and logging
JaySon-Huang Mar 21, 2026
66a18b4
metrics: Update grafana panel
JaySon-Huang Mar 21, 2026
14140e0
Add debug logging
JaySon-Huang Mar 21, 2026
40ac415
tests: add write-group remote apply behavior coverage
JaySon-Huang Mar 21, 2026
5856c85
fix(page): return writer-scoped applied remote lock ids
JaySon-Huang Mar 21, 2026
d2b5ace
cleanup(page): remove investigation logs and keep actionable signals
JaySon-Huang Mar 22, 2026
f9b1b68
refactor(page): simplify remote write and lock cleanup logging
JaySon-Huang Mar 22, 2026
9aad725
docs: refine exception and logging guidance
JaySon-Huang Mar 22, 2026
998eb35
refactor(page): centralize remote write count handling
JaySon-Huang Mar 22, 2026
5a1e219
fix(page): cleanup pre-lock keys on write failure
JaySon-Huang Mar 22, 2026
83df9b1
tests(page): document purposes and steps for lock cleanup cases
JaySon-Huang Mar 22, 2026
98d0036
tests(page): force async launch for syncpoint concurrency cases
JaySon-Huang Mar 22, 2026
b360e87
fix(page): avoid partial pre-lock residue on lock creation
JaySon-Huang Mar 22, 2026
7f0b357
tests(page): cover lock return semantics and partial failure
JaySon-Huang Mar 22, 2026
bda0e5a
tests(page): enforce async launch in write-group syncpoint tests
JaySon-Huang Mar 22, 2026
48d3aa7
Format codes
JaySon-Huang Mar 22, 2026
ed82d79
enhance(s3gc): add owner-only periodic storage summary task
JaySon-Huang Mar 22, 2026
7b09efe
metrics: Update grafana panel about tiflash_storage_s3_store_summary_…
JaySon-Huang Mar 22, 2026
b007911
Revert "enhance(s3gc): add owner-only periodic storage summary task"
JaySon-Huang Mar 23, 2026
7393170
Revert the changes in grafana
JaySon-Huang Mar 23, 2026
c71feec
Address comment
JaySon-Huang Mar 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,15 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_to_finished, {"type", "to_finished"}), \
F(type_to_error, {"type", "to_error"}), \
F(type_to_cancelled, {"type", "to_cancelled"})) \
M(tiflash_storage_s3_lock_mgr_status, "S3 Lock Manager", Gauge, F(type_prelock_keys, {{"type", "prelock_keys"}})) \
M(tiflash_storage_s3_lock_mgr_counter, \
"S3 Lock Manager Counter", \
Counter, \
F(type_create_lock_local, {{"type", "create_lock_local"}}), \
F(type_create_lock_ingest, {{"type", "create_lock_ingest"}}), \
F(type_clean_lock, {{"type", "clean_lock"}}), \
F(type_clean_lock_erase_hit, {{"type", "clean_lock_erase_hit"}}), \
F(type_clean_lock_erase_miss, {{"type", "clean_lock_erase_miss"}})) \
M(tiflash_storage_s3_gc_status, \
"S3 GC status", \
Gauge, \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,10 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(

if (!isSegmentValid(lock, segment))
{
LOG_INFO(
log,
"MergeDelta - Give up segmentMergeDelta because segment not valid, reason=concurrent_update segment={}",
segment->simpleInfo());
LOG_DEBUG(
log,
"MergeDelta - Give up segmentMergeDelta because segment not valid, segment={}",
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/DeltaMerge/WriteBatchesImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ struct WriteBatches : private boost::noncopyable

void rollbackWrittenLogAndData()
{
LOG_INFO(
Logger::get("WriteBatches"),
"Rollback written log/data, run_mode={} written_log_pages={} written_data_pages={}",
magic_enum::enum_name(run_mode),
written_log.size(),
written_data.size());
WriteBatchWrapper log_wb(run_mode, keyspace_id, StorageType::Log, ns_id);
for (auto p : written_log)
log_wb.delPage(p);
Expand Down
25 changes: 14 additions & 11 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <shared_mutex>
#include <type_traits>
#include <utility>
#include <vector>


#ifdef FIU_ENABLE
Expand Down Expand Up @@ -1612,6 +1613,13 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
CurrentMetrics::Increment pending_writer_size{CurrentMetrics::PSPendingWriterNum};
Writer w;
w.edit = &edit;
// Capture this writer's checkpoint data_file_ids before write-group merge.
// Followers' edit objects are cleared by the owner during merge.
for (const auto & r : edit.getRecords())
{
if (r.entry.checkpoint_info.has_value())
w.applied_data_files.emplace(*r.entry.checkpoint_info.data_location.data_file_id);
}

Stopwatch watch;
std::unique_lock apply_lock(apply_mutex);
Expand Down Expand Up @@ -1639,9 +1647,9 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown exception");
}
}
// the `applied_data_files` will be returned by the write
// group owner, others just return an empty set.
return {};
// Return per-writer ids instead of merged-group ids, so upper-layer
// lock cleanup can always clean locks created by this writer.
return std::move(w.applied_data_files);
}

/// This thread now is the write group owner, build the group. It will merge the
Expand Down Expand Up @@ -1703,7 +1711,6 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
});

SYNC_FOR("before_PageDirectory::apply_to_memory");
std::unordered_set<String> applied_data_files;
{
std::unique_lock table_lock(table_rw_mutex);

Expand Down Expand Up @@ -1775,12 +1782,6 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
"should not handle edit with invalid type, type={}",
magic_enum::enum_name(r.type));
}

// collect the applied remote data_file_ids
if (r.entry.checkpoint_info.has_value())
{
applied_data_files.emplace(*r.entry.checkpoint_info.data_location.data_file_id);
}
}
catch (DB::Exception & e)
{
Expand All @@ -1800,7 +1801,9 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
}

success = true;
return applied_data_files;
// Even for write-group owner, return only this writer's pre-captured ids.
// Other writers return their own ids in the `w.done` branch above.
return std::move(w.applied_data_files);
}

template <typename Trait>
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,9 @@ class PageDirectory
struct Writer
{
PageEntriesEdit * edit;
// Keep per-writer checkpoint lock keys before write-group merge so
// followers can still return their own applied ids for lock cleanup.
std::unordered_set<String> applied_data_files;
bool done = false; // The work has been performed by other thread
bool success = false; // The work complete successfully
std::unique_ptr<DB::Exception> exception;
Expand Down
57 changes: 49 additions & 8 deletions dbms/src/Storages/Page/V3/Universal/S3LockLocalManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/Exception.h>
#include <Common/Stopwatch.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/Disaggregated/S3LockClient.h>
#include <Storages/Page/V3/CheckpointFile/CPManifestFileReader.h>
#include <Storages/Page/V3/CheckpointFile/Proto/manifest_file.pb.h>
Expand All @@ -40,7 +41,7 @@ S3LockLocalManager::S3LockLocalManager()
{}

// `store_id` is inited later because they may not
// accessable when S3LockLocalManager is created.
// accessible when S3LockLocalManager is created.
std::optional<CheckpointProto::ManifestFilePrefix> S3LockLocalManager::initStoreInfo(
StoreID actual_store_id,
DB::S3::S3LockClientPtr s3lock_client_,
Expand Down Expand Up @@ -224,7 +225,8 @@ String S3LockLocalManager::createS3Lock(
// TODO: handle s3 network error and retry?
auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
S3::uploadEmptyFile(*s3_client, lockkey);
LOG_DEBUG(log, "S3 lock created for local datafile, lockkey={}", lockkey);
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_create_lock_local).Increment();
LOG_DEBUG(log, "S3 lock created for local datafile, datafile_key={} lockkey={}", datafile_key, lockkey);
}
else
{
Expand All @@ -237,7 +239,8 @@ String S3LockLocalManager::createS3Lock(
{
throw Exception(ErrorCodes::S3_LOCK_CONFLICT, err_msg);
}
LOG_DEBUG(log, "S3 lock created for ingest datafile, lockkey={}", lockkey);
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_create_lock_ingest).Increment();
LOG_DEBUG(log, "S3 lock created for ingest datafile, datafile_key={} lockkey={}", datafile_key, lockkey);
}

// The related S3 data files in write batch is not applied into PageDirectory,
Expand All @@ -247,19 +250,57 @@ String S3LockLocalManager::createS3Lock(
{
std::unique_lock wlatch_keys(mtx_lock_keys);
pre_lock_keys.emplace(lockkey);
GET_METRIC(tiflash_storage_s3_lock_mgr_status, type_prelock_keys).Set(pre_lock_keys.size());
}
return lockkey;
}

void S3LockLocalManager::cleanAppliedS3ExternalFiles(std::unordered_set<String> && applied_s3files)
{
// After the entries applied into PageDirectory, manifest can get the S3 lock key
// from `VersionedPageEntries`, cleanup the pre lock files.
std::unique_lock wlatch_keys(mtx_lock_keys);
for (const auto & file : applied_s3files)
size_t erase_hit = 0;
size_t erase_miss = 0;
size_t remaining_pre_lock_keys = 0;
{
pre_lock_keys.erase(file);
// After the entries applied into PageDirectory, manifest can get the S3 lock key
// from `VersionedPageEntries`, cleanup the pre lock files.
std::unique_lock wlatch_keys(mtx_lock_keys);
for (const auto & file : applied_s3files)
{
if (pre_lock_keys.erase(file) > 0)
{
++erase_hit;
}
else
{
++erase_miss;
}
}
remaining_pre_lock_keys = pre_lock_keys.size();
GET_METRIC(tiflash_storage_s3_lock_mgr_status, type_prelock_keys).Set(remaining_pre_lock_keys);
} // release the lock on mtx_lock_keys before logging
if (erase_miss > 0)
{
LOG_WARNING(
log,
"Clean applied S3 external files, applied_count={} erase_hit={} erase_miss={} remaining_pre_lock_keys={}",
applied_s3files.size(),
erase_hit,
erase_miss,
remaining_pre_lock_keys);
}
else
{
LOG_DEBUG(
log,
"Clean applied S3 external files, applied_count={} erase_hit={} erase_miss={} remaining_pre_lock_keys={}",
applied_s3files.size(),
erase_hit,
erase_miss,
remaining_pre_lock_keys);
}
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock).Increment();
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock_erase_hit).Increment(erase_hit);
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock_erase_miss).Increment(erase_miss);
}

} // namespace DB::PS::V3
108 changes: 106 additions & 2 deletions dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@
#include <common/logger_useful.h>
#include <fiu.h>

#include <atomic>
#include <exception>
#include <functional>
#include <mutex>
#include <vector>

namespace DB
{
Expand Down Expand Up @@ -110,6 +114,9 @@ void UniversalPageStorage::write(
SCOPE_EXIT(
{ GET_METRIC(tiflash_storage_page_write_duration_seconds, type_total).Observe(watch.elapsedSeconds()); });
bool has_writes_from_remote = write_batch.hasWritesFromRemote();
static std::atomic_uint64_t remote_write_trace_id{0};
const auto trace_id = has_writes_from_remote ? remote_write_trace_id.fetch_add(1, std::memory_order_relaxed) + 1 : 0;
size_t remote_lock_key_count = 0;
if (has_writes_from_remote)
{
assert(remote_locks_local_mgr != nullptr);
Expand All @@ -118,12 +125,109 @@ void UniversalPageStorage::write(
// If any "lock" failed to be created, then it will throw exception.
// Note that if `remote_locks_local_mgr`'s store_id is not inited, it will blocks until inited
remote_locks_local_mgr->createS3LockForWriteBatch(write_batch);
for (const auto & w : write_batch.getWrites())
{
switch (w.type)
{
case WriteBatchWriteType::PUT_EXTERNAL:
case WriteBatchWriteType::PUT_REMOTE:
if (w.data_location.has_value())
++remote_lock_key_count;
break;
default:
break;
}
}
LOG_DEBUG(
log,
"Remote write batch begin, trace_id={} lock_key_count={}",
trace_id,
remote_lock_key_count);
}
auto edit = blob_store->write(std::move(write_batch), page_type, write_limiter);
auto applied_lock_ids = page_directory->apply(std::move(edit), write_limiter);
auto edit = [&]() {
try
{
return blob_store->write(std::move(write_batch), page_type, write_limiter);
}
catch (const DB::Exception & e)
{
if (has_writes_from_remote)
{
LOG_WARNING(
log,
"Remote write batch failed at blob_store->write, trace_id={} lock_key_count={} err={}",
trace_id,
remote_lock_key_count,
e.message());
}
throw;
}
catch (const std::exception & e)
{
if (has_writes_from_remote)
{
LOG_WARNING(
log,
"Remote write batch failed at blob_store->write, trace_id={} lock_key_count={} err={}",
trace_id,
remote_lock_key_count,
e.what());
}
throw;
}
}();
auto applied_lock_ids = [&]() {
try
{
return page_directory->apply(std::move(edit), write_limiter);
}
catch (const DB::Exception & e)
{
if (has_writes_from_remote)
{
LOG_WARNING(
log,
"Remote write batch failed at page_directory->apply, trace_id={} lock_key_count={} err={}",
trace_id,
remote_lock_key_count,
e.message());
}
throw;
}
catch (const std::exception & e)
{
if (has_writes_from_remote)
{
LOG_WARNING(
log,
"Remote write batch failed at page_directory->apply, trace_id={} lock_key_count={} err={}",
trace_id,
remote_lock_key_count,
e.what());
}
throw;
}
}();
if (has_writes_from_remote)
{
assert(remote_locks_local_mgr != nullptr);
if (remote_lock_key_count > 0 && applied_lock_ids.empty())
{
LOG_WARNING(
log,
"Remote write batch has lock keys but no applied lock ids, trace_id={} lock_key_count={}",
trace_id,
remote_lock_key_count);
}
else
{
LOG_DEBUG(
log,
"Remote write batch end, trace_id={} applied_lock_count={} lock_key_count={}",
trace_id,
applied_lock_ids.size(),
remote_lock_key_count);
}
// Remove the applied locks from checkpoint_manager.pre_lock_files
remote_locks_local_mgr->cleanAppliedS3ExternalFiles(std::move(applied_lock_ids));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,11 @@ bool UniversalPageStorageService::uploadCheckpointImpl(
{
LOG_INFO(log, "Upload checkpoint with all existing data");
}
LOG_INFO(
log,
"Checkpoint lock set, upload_sequence={} pre_lock_keys_size={}",
upload_info.upload_sequence,
upload_info.pre_lock_keys.size());
UniversalPageStorage::DumpCheckpointOptions opts{
.data_file_id_pattern = S3::S3Filename::newCheckpointDataNameTemplate(store_info.id(), upload_info.upload_sequence),
.data_file_path_pattern = local_dir_str + "dat_{seq}_{index}",
Expand Down
Loading