Skip to content

Commit 76a4ec4

Browse files
authored
vector: Fix building vector index on DeltaVS may lead to delta compact failure (#10311)
ref #9600, close #10310 * In `DeltaMergeStore::segmentEnsureDeltaLocalIndexAsync` and `DeltaMergeStore::segmentWaitDeltaLocalIndexReady` if we detect that `segment->getDelta()->isUpdating()`, it means the segment is running an update task (SegmentMergeDelta/SegmentMerge/SegmentSplit), just skip the local index build job * `ColumnFilePersistedSet::installCompactionResults` if we detect the column file set is changed during minor compaction, then log an error and rollback the minor compaction * `WriteBatches::rollbackWrittenLogAndData` will also remove the pending `removed_log` and `removed_data` Signed-off-by: JaySon-Huang <tshent@qq.com>
1 parent decca88 commit 76a4ec4

File tree

7 files changed

+374
-85
lines changed

7 files changed

+374
-85
lines changed

dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ void ColumnFilePersistedSet::checkColumnFiles(const ColumnFilePersisteds & new_c
7575

7676
RUNTIME_CHECK_MSG(
7777
new_rows == rows && new_deletes == deletes,
78-
"Rows and deletes check failed. Actual: rows[{}], deletes[{}]. Expected: rows[{}], deletes[{}]. Current column "
78+
"Rows and deletes check failed. Actual: rows={} deletes={} Expected: rows={} deletes={}. Current column "
7979
"files: {}, new column files: {}.", //
8080
new_rows,
8181
new_deletes,
@@ -316,6 +316,7 @@ bool ColumnFilePersistedSet::installCompactionResults(const MinorCompactionPtr &
316316
minor_compaction_version += 1;
317317
LOG_DEBUG(log, "{}, before commit compaction, persisted column files: {}", info(), detailInfo());
318318
ColumnFilePersisteds new_persisted_files;
319+
// Generate the new persisted files by the compaction results.
319320
for (const auto & task : compaction->getTasks())
320321
{
321322
if (task.is_trivial_move)
@@ -333,23 +334,28 @@ bool ColumnFilePersistedSet::installCompactionResults(const MinorCompactionPtr &
333334
|| (file->getId() != (*old_persisted_files_iter)->getId())
334335
|| (file->getRows() != (*old_persisted_files_iter)->getRows())))
335336
{
336-
throw Exception(
337-
ErrorCodes::LOGICAL_ERROR,
338-
"Compaction algorithm broken, "
339-
"compaction={{{}}} persisted_files={} "
337+
// The ColumnFile in `to_compact` is not in the latest persisted files, skip
338+
// the minor compaction.
339+
LOG_WARNING(
340+
log,
341+
"Minor Compaction is skipped, "
342+
"compaction={{{}}} persisted_files={} task={} "
340343
"old_persisted_files_iter.is_end={} "
341344
"file->getId={} old_persist_files->getId={} file->getRows={} old_persist_files->getRows={}",
342345
compaction->info(),
343346
detailInfo(),
347+
task.toString(),
344348
old_persisted_files_iter == persisted_files.end(),
345349
file->getId(),
346350
old_persisted_files_iter == persisted_files.end() ? -1 : (*old_persisted_files_iter)->getId(),
347351
file->getRows(),
348352
old_persisted_files_iter == persisted_files.end() ? -1 : (*old_persisted_files_iter)->getRows());
353+
return false;
349354
}
350355
old_persisted_files_iter++;
351356
}
352357
}
358+
// The new persisted_files contains the files that appended during the minor compaction.
353359
while (old_persisted_files_iter != persisted_files.end())
354360
{
355361
new_persisted_files.emplace_back(*old_persisted_files_iter);
@@ -387,7 +393,7 @@ ColumnFileSetSnapshotPtr ColumnFilePersistedSet::createSnapshot(const IColumnFil
387393
{
388394
LOG_ERROR(
389395
log,
390-
"Rows and deletes check failed. Actual: rows[{}], deletes[{}]. Expected: rows[{}], deletes[{}].",
396+
"Rows and deletes check failed. Actual: rows={} deletes={}. Expected: rows={} deletes={}",
391397
total_rows,
392398
total_deletes,
393399
rows.load(),

dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ std::pair<ColumnFiles, ColumnFilePersisteds> DeltaValueSpace::cloneNewlyAppended
224224
snapshot_persisted_files.begin(),
225225
snapshot_persisted_files.end());
226226
// If there were flush since the snapshot, the flushed files should be behind the files in the snapshot.
227-
// So let's place these "flused files" after the persisted files in snapshot.
227+
// So let's place these "flushed files" after the persisted files in snapshot.
228228
head_persisted_files.insert(head_persisted_files.end(), flushed_mem_files.begin(), flushed_mem_files.end());
229229

230230
auto new_persisted_files = persisted_file_set->diffColumnFiles(head_persisted_files);
@@ -354,9 +354,7 @@ bool DeltaValueSpace::flush(DMContext & context)
354354
SCOPE_EXIT({
355355
bool v = true;
356356
if (!is_flushing.compare_exchange_strong(v, false))
357-
throw Exception(
358-
fmt::format("Delta is expected to be flushing, delta={}", simple_info),
359-
ErrorCodes::LOGICAL_ERROR);
357+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Delta is expected to be flushing, delta={}", simple_info);
360358
});
361359

362360
LOG_DEBUG(log, "Flush start, delta={}", info());
@@ -475,6 +473,8 @@ bool DeltaValueSpace::compact(DMContext & context)
475473
/*tracing_id*/ fmt::format("minor_compact_{}", simpleInfo()));
476474
}
477475

476+
SYNC_FOR("DeltaValueSpace::compact_after_compaction_task_build");
477+
478478
WriteBatches wbs(*context.storage_pool, context.getWriteLimiter());
479479
{
480480
// do compaction task
@@ -500,9 +500,11 @@ bool DeltaValueSpace::compact(DMContext & context)
500500
}
501501
if (!compaction_task->commit(persisted_file_set, wbs))
502502
{
503-
LOG_WARNING(log, "Structure has been updated during compact, delta={}", simpleInfo());
503+
LOG_DEBUG(
504+
log,
505+
"Structure has been updated during compact, delta compaction is stopped, delta={}",
506+
simpleInfo());
504507
wbs.rollbackWrittenLogAndData();
505-
LOG_DEBUG(log, "Compact stop because structure got updated, delta={}", simpleInfo());
506508
return false;
507509
}
508510
// Reset to the index of first file that can be compacted if the minor compaction succeed,

dbms/src/Storages/DeltaMerge/Delta/MinorCompaction.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,19 @@ class MinorCompaction : public std::enable_shared_from_this<MinorCompaction>
5050
total_rows += column_file->getRows();
5151
total_bytes += column_file->getBytes();
5252
}
53+
54+
String toString() const
55+
{
56+
return fmt::format(
57+
"Task{{to_compact_size={} first_file_index={} total_rows={} total_bytes={} is_trivial_move={} "
58+
"result_rows={}}}",
59+
to_compact.size(),
60+
first_file_index,
61+
total_rows,
62+
total_bytes,
63+
is_trivial_move,
64+
result ? result->getRows() : 0);
65+
}
5366
};
5467
using Tasks = std::vector<Task>;
5568

dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp

Lines changed: 69 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <Storages/DeltaMerge/ColumnFile/ColumnFileDataProvider.h>
2121
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTinyLocalIndexWriter.h>
2222
#include <Storages/DeltaMerge/DMContext.h>
23+
#include <Storages/DeltaMerge/Delta/DeltaValueSpace.h>
2324
#include <Storages/DeltaMerge/DeltaMergeStore.h>
2425
#include <Storages/DeltaMerge/File/DMFileLocalIndexWriter.h>
2526
#include <Storages/DeltaMerge/LocalIndexerScheduler.h>
@@ -851,75 +852,110 @@ void DeltaMergeStore::segmentEnsureStableLocalIndexWithErrorReport(
851852
}
852853
}
853854

854-
bool DeltaMergeStore::segmentEnsureDeltaLocalIndexAsync(const SegmentPtr & segment)
855+
namespace
856+
{
857+
struct LocalIndexOnDeltaVSBuildInfo
858+
{
859+
ColumnFileTinyLocalIndexWriter::LocalIndexBuildInfo build_info;
860+
std::weak_ptr<DeltaValueSpace> delta_weak_ptr;
861+
};
862+
std::optional<LocalIndexOnDeltaVSBuildInfo> //
863+
genBuildInfoFromDeltaVS(const SegmentPtr & segment, const LocalIndexInfosSnapshot & local_index_infos)
855864
{
856-
RUNTIME_CHECK(segment != nullptr);
857-
858-
auto local_index_infos_snap = getLocalIndexInfosSnapshot();
859-
if (!local_index_infos_snap)
860-
return false;
861-
862865
// Acquire a lock to make sure delta is not changed during the process.
863866
auto lock = segment->getUpdateLock();
864867
if (!lock)
865-
return false;
868+
return std::nullopt;
869+
// The segment is running an update(SegmentMergeDelta/SegmentMerge/SegmentSplit) task, skip the index build.
870+
if (segment->getDelta()->isUpdating())
871+
return std::nullopt;
872+
873+
// In case nothing to be built
866874
auto column_file_persisted_set = segment->getDelta()->getPersistedFileSet();
867875
if (!column_file_persisted_set)
868-
return false;
876+
return std::nullopt;
869877
auto build_info
870-
= ColumnFileTinyLocalIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, column_file_persisted_set);
878+
= ColumnFileTinyLocalIndexWriter::getLocalIndexBuildInfo(local_index_infos, column_file_persisted_set);
871879
if (!build_info.indexes_to_build || build_info.indexes_to_build->empty())
872-
return false;
880+
return std::nullopt;
881+
873882
// Use weak_ptr to avoid blocking gc.
874883
auto delta_weak_ptr = std::weak_ptr<DeltaValueSpace>(segment->getDelta());
875884
lock->unlock();
885+
return LocalIndexOnDeltaVSBuildInfo{build_info, delta_weak_ptr};
886+
}
887+
} // namespace
888+
889+
bool DeltaMergeStore::segmentEnsureDeltaLocalIndexAsync(const SegmentPtr & segment)
890+
{
891+
RUNTIME_CHECK(segment != nullptr);
892+
893+
auto local_index_infos_snap = getLocalIndexInfosSnapshot();
894+
if (!local_index_infos_snap)
895+
return false;
896+
897+
auto delta_vs_build_info = genBuildInfoFromDeltaVS(segment, local_index_infos_snap);
898+
if (!delta_vs_build_info)
899+
{
900+
LOG_DEBUG(
901+
log,
902+
"segmentEnsureDeltaLocalIndexAsync - Give up because no index to build or delta is being updated, "
903+
"segment={}",
904+
segment->simpleInfo());
905+
return true;
906+
}
876907

877908
auto store_weak_ptr = weak_from_this();
878-
auto tracing_id = fmt::format("segmentEnsureDeltaLocalIndexAsync source_segment={}", segment->simpleInfo());
879-
auto workload = [store_weak_ptr, build_info, delta_weak_ptr, segment, tracing_id]() -> void {
909+
const auto source_segment_info = segment->simpleInfo();
910+
auto workload = [store_weak_ptr, delta_vs_build_info, source_segment_info]() -> void {
880911
auto store = store_weak_ptr.lock();
881912
if (!store) // Store is destroyed before the task is executed.
882913
return;
883-
auto delta = delta_weak_ptr.lock();
914+
auto delta = delta_vs_build_info->delta_weak_ptr.lock();
884915
if (!delta) // Delta is destroyed before the task is executed.
885916
return;
917+
auto tracing_id = fmt::format("segmentEnsureDeltaLocalIndexAsync source_segment={}", source_segment_info);
886918
auto dm_context = store->newDMContext( //
887919
store->global_context,
888920
store->global_context.getSettingsRef(),
889921
tracing_id);
890-
const auto source_segment_info = segment->simpleInfo();
891-
store->segmentEnsureDeltaLocalIndex(*dm_context, build_info.indexes_to_build, delta, source_segment_info);
922+
store->segmentEnsureDeltaLocalIndex(
923+
*dm_context,
924+
delta_vs_build_info->build_info.indexes_to_build,
925+
delta,
926+
source_segment_info);
892927
};
893928

894929
auto indexer_scheduler = global_context.getGlobalLocalIndexerScheduler();
895930
RUNTIME_CHECK(indexer_scheduler != nullptr);
896931
try
897932
{
898933
// new task of these index are generated, clear existing error_message in segment
899-
segment->clearIndexBuildError(build_info.index_ids);
934+
segment->clearIndexBuildError(delta_vs_build_info->build_info.index_ids);
900935

901936
auto [ok, reason] = indexer_scheduler->pushTask(LocalIndexerScheduler::Task{
902937
.keyspace_id = keyspace_id,
903938
.table_id = physical_table_id,
904-
.file_ids = build_info.file_ids,
905-
.request_memory = build_info.estimated_memory_bytes,
939+
.file_ids = delta_vs_build_info->build_info.file_ids,
940+
.request_memory = delta_vs_build_info->build_info.estimated_memory_bytes,
906941
.workload = workload,
907942
});
908943
if (ok)
909944
return true;
910945

911-
segment->setIndexBuildError(build_info.index_ids, reason);
946+
segment->setIndexBuildError(delta_vs_build_info->build_info.index_ids, reason);
947+
auto tracing_id = fmt::format("segmentEnsureDeltaLocalIndexAsync source_segment={}", source_segment_info);
912948
LOG_ERROR(
913949
log->getChild(tracing_id),
914950
"Failed to generate async segment stable index task, index_ids={} reason={}",
915-
build_info.index_ids,
951+
delta_vs_build_info->build_info.index_ids,
916952
reason);
917953
return false;
918954
}
919955
catch (...)
920956
{
921957
const auto message = getCurrentExceptionMessage(false, false);
922-
segment->setIndexBuildError(build_info.index_ids, message);
958+
segment->setIndexBuildError(delta_vs_build_info->build_info.index_ids, message);
923959

924960
tryLogCurrentException(log);
925961

@@ -937,18 +973,15 @@ bool DeltaMergeStore::segmentWaitDeltaLocalIndexReady(const SegmentPtr & segment
937973
if (!local_index_infos_snap)
938974
return true;
939975

940-
// Acquire a lock to make sure delta is not changed during the process.
941-
auto lock = segment->mustGetUpdateLock();
942-
auto column_file_persisted_set = segment->getDelta()->getPersistedFileSet();
943-
if (!column_file_persisted_set)
944-
return false;
945-
auto build_info
946-
= ColumnFileTinyLocalIndexWriter::getLocalIndexBuildInfo(local_index_infos, column_file_persisted_set);
947-
// Use weak_ptr to avoid blocking gc.
948-
auto delta_weak_ptr = std::weak_ptr<DeltaValueSpace>(segment->getDelta());
949-
lock.unlock();
950-
if (!build_info.indexes_to_build || build_info.indexes_to_build->empty())
951-
return false;
976+
auto delta_vs_build_info = genBuildInfoFromDeltaVS(segment, local_index_infos_snap);
977+
if (!delta_vs_build_info)
978+
{
979+
LOG_INFO(
980+
log,
981+
"WaitDeltaLocalIndexReady - Give up because no index to build or delta is being updated, segment={}",
982+
segment->simpleInfo());
983+
return true;
984+
}
952985

953986
auto segment_id = segment->segmentId();
954987
static constexpr size_t MAX_CHECK_TIME_SECONDS = 60; // 60s
@@ -964,11 +997,11 @@ bool DeltaMergeStore::segmentWaitDeltaLocalIndexReady(const SegmentPtr & segment
964997
if (!column_file_persisted_set)
965998
return false; // ColumnFilePersistedSet is not exist, return false
966999
bool all_indexes_built = true;
967-
auto delta_ptr = delta_weak_ptr.lock();
1000+
auto delta_ptr = delta_vs_build_info->delta_weak_ptr.lock();
9681001
if (auto lock = delta_ptr ? delta_ptr->getLock() : std::nullopt; lock)
9691002
{
9701003
const auto & column_files = column_file_persisted_set->getFiles();
971-
for (const auto & index : *build_info.indexes_to_build)
1004+
for (const auto & index : *delta_vs_build_info->build_info.indexes_to_build)
9721005
{
9731006
for (const auto & column_file : column_files)
9741007
{

0 commit comments

Comments
 (0)