Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
6ead46d
Add metrics and logging
JaySon-Huang Mar 21, 2026
66a18b4
metrics: Update grafana panel
JaySon-Huang Mar 21, 2026
14140e0
Add debug logging
JaySon-Huang Mar 21, 2026
40ac415
tests: add write-group remote apply behavior coverage
JaySon-Huang Mar 21, 2026
5856c85
fix(page): return writer-scoped applied remote lock ids
JaySon-Huang Mar 21, 2026
d2b5ace
cleanup(page): remove investigation logs and keep actionable signals
JaySon-Huang Mar 22, 2026
f9b1b68
refactor(page): simplify remote write and lock cleanup logging
JaySon-Huang Mar 22, 2026
9aad725
docs: refine exception and logging guidance
JaySon-Huang Mar 22, 2026
998eb35
refactor(page): centralize remote write count handling
JaySon-Huang Mar 22, 2026
5a1e219
fix(page): cleanup pre-lock keys on write failure
JaySon-Huang Mar 22, 2026
83df9b1
tests(page): document purposes and steps for lock cleanup cases
JaySon-Huang Mar 22, 2026
98d0036
tests(page): force async launch for syncpoint concurrency cases
JaySon-Huang Mar 22, 2026
b360e87
fix(page): avoid partial pre-lock residue on lock creation
JaySon-Huang Mar 22, 2026
7f0b357
tests(page): cover lock return semantics and partial failure
JaySon-Huang Mar 22, 2026
bda0e5a
tests(page): enforce async launch in write-group syncpoint tests
JaySon-Huang Mar 22, 2026
48d3aa7
Format codes
JaySon-Huang Mar 22, 2026
ed82d79
enhance(s3gc): add owner-only periodic storage summary task
JaySon-Huang Mar 22, 2026
7b09efe
metrics: Update grafana panel about tiflash_storage_s3_store_summary_…
JaySon-Huang Mar 22, 2026
b007911
Revert "enhance(s3gc): add owner-only periodic storage summary task"
JaySon-Huang Mar 23, 2026
7393170
Revert the changes in grafana
JaySon-Huang Mar 23, 2026
c71feec
Address comment
JaySon-Huang Mar 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,15 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_to_finished, {"type", "to_finished"}), \
F(type_to_error, {"type", "to_error"}), \
F(type_to_cancelled, {"type", "to_cancelled"})) \
M(tiflash_storage_s3_lock_mgr_status, "S3 Lock Manager", Gauge, F(type_prelock_keys, {{"type", "prelock_keys"}})) \
M(tiflash_storage_s3_lock_mgr_counter, \
"S3 Lock Manager Counter", \
Counter, \
F(type_create_lock_local, {{"type", "create_lock_local"}}), \
F(type_create_lock_ingest, {{"type", "create_lock_ingest"}}), \
F(type_clean_lock, {{"type", "clean_lock"}}), \
F(type_clean_lock_erase_hit, {{"type", "clean_lock_erase_hit"}}), \
F(type_clean_lock_erase_miss, {{"type", "clean_lock_erase_miss"}})) \
M(tiflash_storage_s3_gc_status, \
"S3 GC status", \
Gauge, \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,10 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(

if (!isSegmentValid(lock, segment))
{
LOG_INFO(
log,
"MergeDelta - Give up segmentMergeDelta because segment not valid, reason=concurrent_update segment={}",
segment->simpleInfo());
LOG_DEBUG(
log,
"MergeDelta - Give up segmentMergeDelta because segment not valid, segment={}",
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/DeltaMerge/WriteBatchesImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ struct WriteBatches : private boost::noncopyable

void rollbackWrittenLogAndData()
{
LOG_INFO(
Logger::get("WriteBatches"),
"Rollback written log/data, run_mode={} written_log_pages={} written_data_pages={}",
magic_enum::enum_name(run_mode),
written_log.size(),
written_data.size());
WriteBatchWrapper log_wb(run_mode, keyspace_id, StorageType::Log, ns_id);
for (auto p : written_log)
log_wb.delPage(p);
Expand Down
37 changes: 37 additions & 0 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,41 @@ typename BlobStore<Trait>::PageEntriesEdit BlobStore<Trait>::write(
const size_t all_page_data_size = wb.getTotalDataSize();

PageEntriesEdit edit;
auto log_remote_edit_summary = [&](const char * stage) {
size_t put_external_count = 0;
size_t put_remote_count = 0;
size_t checkpoint_info_count = 0;
std::vector<String> sample_data_file_ids;
for (const auto & rec : edit.getRecords())
{
if (rec.type == EditRecordType::PUT_EXTERNAL)
++put_external_count;
else if (rec.type == EditRecordType::PUT)
++put_remote_count;
if (rec.entry.checkpoint_info.has_value())
{
++checkpoint_info_count;
if (sample_data_file_ids.size() < 8)
{
sample_data_file_ids.emplace_back(*rec.entry.checkpoint_info.data_location.data_file_id);
}
}
}
if (checkpoint_info_count == 0)
return;
LOG_INFO(
log,
"BlobStore::write remote edit summary, stage={} wb_size={} edit_size={} put_external_count={} "
"put_remote_count={} checkpoint_info_count={} sample_data_file_ids={} wb={}",
stage,
wb.size(),
edit.size(),
put_external_count,
put_remote_count,
checkpoint_info_count,
sample_data_file_ids,
wb.toString());
};

if (all_page_data_size == 0)
{
Expand Down Expand Up @@ -491,6 +526,7 @@ typename BlobStore<Trait>::PageEntriesEdit BlobStore<Trait>::write(
throw Exception(fmt::format("Unknown write type: {}", magic_enum::enum_name(write.type)));
}
}
log_remote_edit_summary("zero_data_path");
return edit;
}

Expand Down Expand Up @@ -664,6 +700,7 @@ typename BlobStore<Trait>::PageEntriesEdit BlobStore<Trait>::write(
throw e;
}

log_remote_edit_summary("normal_path");
return edit;
}

Expand Down
96 changes: 93 additions & 3 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
#include <common/logger_useful.h>

#include <magic_enum.hpp>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <shared_mutex>
#include <type_traits>
#include <utility>
#include <vector>


#ifdef FIU_ENABLE
Expand Down Expand Up @@ -1612,6 +1614,11 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
CurrentMetrics::Increment pending_writer_size{CurrentMetrics::PSPendingWriterNum};
Writer w;
w.edit = &edit;
for (const auto & r : edit.getRecords())
{
if (r.entry.checkpoint_info.has_value())
w.applied_data_files.emplace(*r.entry.checkpoint_info.data_location.data_file_id);
}

Stopwatch watch;
std::unique_lock apply_lock(apply_mutex);
Expand Down Expand Up @@ -1640,8 +1647,8 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
}
}
// the `applied_data_files` will be returned by the write
// group owner, others just return an empty set.
return {};
// group owner, followers return their own data_file_ids.
return std::move(w.applied_data_files);
}

/// This thread now is the write group owner, build the group. It will merge the
Expand Down Expand Up @@ -1704,12 +1711,53 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,

SYNC_FOR("before_PageDirectory::apply_to_memory");
std::unordered_set<String> applied_data_files;
size_t edit_digest = 0;
size_t put_count = 0;
size_t put_external_count = 0;
size_t checkpoint_info_count = 0;
size_t put_external_with_checkpoint_info_count = 0;
std::vector<String> suspicious_records;
std::vector<String> candidate_records;
{
std::unique_lock table_lock(table_rw_mutex);

// create entry version list for page_id.
for (const auto & r : edit.getRecords())
{
if (r.type == EditRecordType::PUT)
++put_count;
else if (r.type == EditRecordType::PUT_EXTERNAL)
++put_external_count;
if (r.entry.checkpoint_info.has_value())
{
++checkpoint_info_count;
if (r.type == EditRecordType::PUT_EXTERNAL)
++put_external_with_checkpoint_info_count;
}
if (r.type == EditRecordType::PUT_EXTERNAL && !r.entry.checkpoint_info.has_value()
&& suspicious_records.size() < 8)
{
suspicious_records.emplace_back(fmt::format("page_id={}", r.page_id));
}
if ((r.type == EditRecordType::PUT || r.type == EditRecordType::PUT_EXTERNAL) && candidate_records.size() < 12)
{
if (r.entry.checkpoint_info.has_value())
{
candidate_records.emplace_back(fmt::format(
"type={} page_id={} has_checkpoint=1 data_file_id={}",
magic_enum::enum_name(r.type),
r.page_id,
*r.entry.checkpoint_info.data_location.data_file_id));
}
else
{
candidate_records.emplace_back(fmt::format(
"type={} page_id={} has_checkpoint=0",
magic_enum::enum_name(r.type),
r.page_id));
}
}

// Protected in write_lock
if (r.type == EditRecordType::DEL)
{
Expand Down Expand Up @@ -1799,8 +1847,50 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
sequence.fetch_add(edit_size);
}

for (const auto & rec : candidate_records)
{
size_t rec_hash = std::hash<String>{}(rec);
edit_digest ^= rec_hash + 0x9e3779b97f4a7c15ULL + (edit_digest << 6) + (edit_digest >> 2);
}

if (checkpoint_info_count > 0)
{
// NOTE: test-only verbose diagnostics. Remove or lower level before production rollout.
LOG_INFO(
log,
"PageDirectory apply checkpoint summary, edit_size={} put_count={} put_external_count={} "
"checkpoint_info_count={} put_external_with_checkpoint_info_count={} edit_digest={} applied_data_files={} "
"candidate_records={} suspicious_records={}",
edit_size,
put_count,
put_external_count,
checkpoint_info_count,
put_external_with_checkpoint_info_count,
edit_digest,
applied_data_files,
candidate_records,
suspicious_records);
}

if (applied_data_files.empty() && checkpoint_info_count > 0)
{
LOG_INFO(
log,
"PageDirectory apply has no applied_data_files with checkpoint_info records, edit_size={} put_count={} "
"put_external_count={} checkpoint_info_count={} put_external_with_checkpoint_info_count={} edit_digest={} "
"suspicious_records={} candidate_records={}",
edit_size,
put_count,
put_external_count,
checkpoint_info_count,
put_external_with_checkpoint_info_count,
edit_digest,
suspicious_records,
candidate_records);
}

success = true;
return applied_data_files;
return std::move(w.applied_data_files);
}

template <typename Trait>
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ class PageDirectory
struct Writer
{
PageEntriesEdit * edit;
std::unordered_set<String> applied_data_files;
bool done = false; // The work has been performed by other thread
bool success = false; // The work complete successfully
std::unique_ptr<DB::Exception> exception;
Expand Down
72 changes: 64 additions & 8 deletions dbms/src/Storages/Page/V3/Universal/S3LockLocalManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/Exception.h>
#include <Common/Stopwatch.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/Disaggregated/S3LockClient.h>
#include <Storages/Page/V3/CheckpointFile/CPManifestFileReader.h>
#include <Storages/Page/V3/CheckpointFile/Proto/manifest_file.pb.h>
Expand All @@ -26,6 +27,8 @@

#include <magic_enum.hpp>

#include <vector>


namespace DB::ErrorCodes
{
Expand All @@ -40,7 +43,7 @@ S3LockLocalManager::S3LockLocalManager()
{}

// `store_id` is inited later because they may not
// accessable when S3LockLocalManager is created.
// accessible when S3LockLocalManager is created.
std::optional<CheckpointProto::ManifestFilePrefix> S3LockLocalManager::initStoreInfo(
StoreID actual_store_id,
DB::S3::S3LockClientPtr s3lock_client_,
Expand Down Expand Up @@ -179,6 +182,18 @@ void S3LockLocalManager::createS3LockForWriteBatch(UniversalWriteBatch & write_b
lock_key = std::make_shared<String>(lock_result);
}

if (!s3_datafiles_to_lock.empty())
{
std::vector<String> lock_mappings;
lock_mappings.reserve(s3_datafiles_to_lock.size());
for (const auto & [input_key, lock_key] : s3_datafiles_to_lock)
{
if (lock_key)
lock_mappings.emplace_back(fmt::format("{} -> {}", input_key, *lock_key));
}
LOG_INFO(log, "S3 lock mapping for write batch, mapping_count={} mappings={}", lock_mappings.size(), lock_mappings);
}

for (auto & w : write_batch.getMutWrites())
{
// Here we will replace the name to be the S3LockFile key name for later
Expand Down Expand Up @@ -224,7 +239,8 @@ String S3LockLocalManager::createS3Lock(
// TODO: handle s3 network error and retry?
auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
S3::uploadEmptyFile(*s3_client, lockkey);
LOG_DEBUG(log, "S3 lock created for local datafile, lockkey={}", lockkey);
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_create_lock_local).Increment();
LOG_INFO(log, "S3 lock created for local datafile, datafile_key={} lockkey={}", datafile_key, lockkey);
}
else
{
Expand All @@ -237,7 +253,8 @@ String S3LockLocalManager::createS3Lock(
{
throw Exception(ErrorCodes::S3_LOCK_CONFLICT, err_msg);
}
LOG_DEBUG(log, "S3 lock created for ingest datafile, lockkey={}", lockkey);
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_create_lock_ingest).Increment();
LOG_INFO(log, "S3 lock created for ingest datafile, datafile_key={} lockkey={}", datafile_key, lockkey);
}

// The related S3 data files in write batch is not applied into PageDirectory,
Expand All @@ -247,19 +264,58 @@ String S3LockLocalManager::createS3Lock(
{
std::unique_lock wlatch_keys(mtx_lock_keys);
pre_lock_keys.emplace(lockkey);
GET_METRIC(tiflash_storage_s3_lock_mgr_status, type_prelock_keys).Set(pre_lock_keys.size());
}
return lockkey;
}

void S3LockLocalManager::cleanAppliedS3ExternalFiles(std::unordered_set<String> && applied_s3files)
{
// After the entries applied into PageDirectory, manifest can get the S3 lock key
// from `VersionedPageEntries`, cleanup the pre lock files.
std::unique_lock wlatch_keys(mtx_lock_keys);
for (const auto & file : applied_s3files)
size_t erase_hit = 0;
size_t erase_miss = 0;
size_t remaining_pre_lock_keys = 0;
std::vector<String> erased_keys;
std::vector<String> missing_keys;
erased_keys.reserve(applied_s3files.size());
missing_keys.reserve(applied_s3files.size());
{
// After the entries applied into PageDirectory, manifest can get the S3 lock key
// from `VersionedPageEntries`, cleanup the pre lock files.
std::unique_lock wlatch_keys(mtx_lock_keys);
for (const auto & file : applied_s3files)
{
if (pre_lock_keys.erase(file) > 0)
{
++erase_hit;
erased_keys.emplace_back(file);
}
else
{
++erase_miss;
missing_keys.emplace_back(file);
}
}
remaining_pre_lock_keys = pre_lock_keys.size();
GET_METRIC(tiflash_storage_s3_lock_mgr_status, type_prelock_keys).Set(remaining_pre_lock_keys);
} // release the lock on mtx_lock_keys before logging
LOG_INFO(
log,
"Clean applied S3 external files, applied_count={} erase_hit={} erase_miss={} remaining_pre_lock_keys={}",
applied_s3files.size(),
erase_hit,
erase_miss,
remaining_pre_lock_keys);
for (const auto & key : erased_keys)
{
LOG_INFO(log, "Clean applied S3 external files, erase_hit lockkey={}", key);
}
for (const auto & key : missing_keys)
{
pre_lock_keys.erase(file);
LOG_INFO(log, "Clean applied S3 external files, erase_miss lockkey={}", key);
}
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock).Increment();
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock_erase_hit).Increment(erase_hit);
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock_erase_miss).Increment(erase_miss);
}

} // namespace DB::PS::V3
Loading