diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index f6a0efc5151..d9d83776822 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -1248,6 +1248,7 @@ if(${TEST}) processing/test/benchmark_ternary.cpp util/test/benchmark_bitset.cpp version/test/benchmark_write.cpp + version/test/benchmark_symbol_list.cpp ) set_pdb_name_per_translation_unit(${benchmark_srcs}) diff --git a/cpp/arcticdb/version/symbol_list.cpp b/cpp/arcticdb/version/symbol_list.cpp index 9364754fb13..59fc2e89405 100644 --- a/cpp/arcticdb/version/symbol_list.cpp +++ b/cpp/arcticdb/version/symbol_list.cpp @@ -91,59 +91,6 @@ std::vector load_previous_from_version_keys( return symbols; } -std::vector get_all_symbol_list_keys( - const std::shared_ptr& store, SymbolListData& data, WillAttemptCompaction will_attempt_compaction -) { - std::vector output; - uint64_t uncompacted_keys_found = 0; - store->iterate_type( - KeyType::SYMBOL_LIST, - [&data, &output, &uncompacted_keys_found, will_attempt_compaction](auto&& key) { - auto atom_key = to_atom(std::forward(key)); - if (atom_key.id() != compaction_id) { - uncompacted_keys_found++; - } - if (uncompacted_keys_found == warning_threshold() && !data.warned_expected_slowdown_) { - log::symbol().warn( - "`list_symbols` may take longer than expected as there have been many modifications " - "since `list_symbols` was last called. \n\n" - "See here for more information: " - "https://docs.arcticdb.io/latest/technical/on_disk_storage/#symbol-list-caching\n\n" - "To resolve, run `list_symbols` through to completion frequently. " - "Note: write access to storage is required for compaction. " - "{}.\n" - "Note: This warning will only appear once.\n", - will_attempt_compaction - ); - - data.warned_expected_slowdown_ = true; - } - - output.push_back(atom_key); - } - ); - - std::sort(output.begin(), output.end(), [](const AtomKey& left, const AtomKey& right) { - // Some very old symbol list keys have a non-zero version number, but with different semantics to the new style, - // so ignore it. See arcticdb-man#116. Most old style symbol list keys have version ID 0 anyway. - auto left_version = is_new_style_key(left) ? left.version_id() : 0; - auto right_version = is_new_style_key(right) ? right.version_id() : 0; - return std::tie(left.start_index(), left_version, left.creation_ts()) < - std::tie(right.start_index(), right_version, right.creation_ts()); - }); - return output; -} - -MaybeCompaction last_compaction(const std::vector& keys) { - auto pos = std::find_if(keys.rbegin(), keys.rend(), [](const auto& key) { return key.id() == compaction_id; }); - - if (pos == keys.rend()) { - return std::nullopt; - } else { - return {(pos + 1).base()}; // reverse_iterator -> forward itr has an offset of 1 per docs - } -} - // The below string_at and scalar_at functions should be used for symbol list cache segments instead of the ones // provided in SegmentInMemory, because the symbol list structure is the only place where columns can have more entries // than the segment has rows. Hence, we need to bypass the checks inside SegmentInMemory's function and directly call @@ -269,20 +216,90 @@ std::vector read_from_storage(const std::shared_ptr& keys) { - MapType map; - for (const auto& key : keys) { - const auto& action_id = key.id(); - if (action_id == compaction_id) - continue; +JournalEntryData journal_entry_from_atom(const AtomKey& key) { + return {key.version_id(), + key.creation_ts(), + key.content_hash(), + std::get(key.id()) == DeleteSymbol ? ActionType::DELETE : ActionType::ADD, + is_new_style_key(key)}; +} + +// Reconstructs the original AtomKey from a JournalEntryData + the owning symbol (map key). +// For new-style keys the end_index encodes the version marker; for old-style it equals the symbol. +AtomKey atom_key_from_journal_entry(const StreamId& symbol, const JournalEntryData& ck) { + IndexValue end_index; + if (ck.is_new_style) { + end_index = std::holds_alternative(symbol) ? IndexValue{StringIndex{std::string{version_string}}} + : IndexValue{NumericIndex{version_identifier}}; + } else { + end_index = IndexValue{symbol}; + } + return atom_key_builder() + .version_id(ck.key_version_id) + .creation_ts(ck.creation_ts) + .content_hash(ck.content_hash) + .start_index(IndexValue{symbol}) + .end_index(end_index) + .build(action_id(ck.action), KeyType::SYMBOL_LIST); +} + +SymbolEntryData to_symbol_entry_data(const JournalEntryData& ck) { + const auto reference_id = ck.is_new_style ? ck.key_version_id : unknown_version_id; + return {reference_id, ck.creation_ts, ck.action}; +} + +void add_journal_entry(JournalMapType& update_map, const AtomKey& key) { + update_map[key.start_index()].emplace_back(journal_entry_from_atom(key)); +} - const auto& symbol = key.start_index(); - const auto version_id = is_new_style_key(key) ? key.version_id() : unknown_version_id; - const auto timestamp = key.creation_ts(); - ActionType action = std::get(action_id) == DeleteSymbol ? ActionType::DELETE : ActionType::ADD; - map[symbol].emplace_back(version_id, timestamp, action); +void sort_journal_map(JournalMapType& update_map) { + for (auto& [symbol, keys] : update_map) { + std::sort(keys.begin(), keys.end(), [](const JournalEntryData& a, const JournalEntryData& b) { + auto a_ver = a.is_new_style ? a.key_version_id : VersionId{0}; + auto b_ver = b.is_new_style ? b.key_version_id : VersionId{0}; + return std::tie(a_ver, a.creation_ts) < std::tie(b_ver, b.creation_ts); + }); } - return map; +} + +/// Single-pass iteration over SYMBOL_LIST keys: builds the update map and locates the latest +/// compaction key. Always uses JournalEntryData (32B/entry) for the update map. +JournalResult load_journal_streaming( + const std::shared_ptr& store, SymbolListData& data, WillAttemptCompaction will_attempt_compaction +) { + JournalResult result; + size_t uncompacted_keys_found = 0; + + store->iterate_type(KeyType::SYMBOL_LIST, [&](auto&& key) { + auto atom_key = to_atom(std::forward(key)); + result.total_key_count++; + + if (atom_key.id() == compaction_id) { + if (!result.compaction_key || atom_key.creation_ts() > result.compaction_key->creation_ts()) + result.compaction_key = atom_key; + result.compaction_keys.emplace_back(std::move(atom_key)); + } else { + ++uncompacted_keys_found; + if (uncompacted_keys_found == warning_threshold() && !data.warned_expected_slowdown_) { + log::symbol().warn( + "`list_symbols` may take longer than expected as there have been many modifications " + "since `list_symbols` was last called. \n\n" + "See here for more information: " + "https://docs.arcticdb.io/latest/technical/on_disk_storage/#symbol-list-caching\n\n" + "To resolve, run `list_symbols` through to completion frequently. " + "Note: write access to storage is required for compaction. " + "{}.\n" + "Note: This warning will only appear once.\n", + will_attempt_compaction + ); + data.warned_expected_slowdown_ = true; + } + add_journal_entry(result.update_map, atom_key); + } + }); + + sort_journal_map(result.update_map); + return result; } auto tail_range(const std::vector& updated) { @@ -393,15 +410,66 @@ ProblematicResult is_problematic( return ProblematicResult{latest}; } -CollectionType merge_existing_with_journal_keys( +void resolve_problematic_symbols( + const std::shared_ptr& version_map, const std::shared_ptr& store, + std::map>& problematic_symbols, CollectionType& symbols +) { + if (problematic_symbols.empty()) + return; + + auto symbol_versions = std::make_shared>(); + for (const auto& [symbol, reference_pair] : problematic_symbols) + symbol_versions->emplace_back(symbol); + + auto versions = batch_check_latest_id_and_status(store, version_map, symbol_versions); + + for (const auto& [symbol, reference_pair] : problematic_symbols) { + auto reference_id = reference_pair.first; + + if (auto version = versions->find(symbol); version != versions->end()) { + const auto& symbol_state = version->second; + if (symbol_state.exists_) { + ARCTICDB_DEBUG( + log::symbol(), + "Problematic symbol/version pair: {}@{}: exists at id {}", + symbol, + reference_id, + symbol_state.version_id_ + ); + symbols.emplace_back(symbol, symbol_state.version_id_, symbol_state.timestamp_, ActionType::ADD); + } else { + symbols.emplace_back(symbol, symbol_state.version_id_, symbol_state.timestamp_, ActionType::DELETE); + ARCTICDB_DEBUG( + log::symbol(), + "Problematic symbol/version pair: {}@{}: deleted at id {}", + symbol, + reference_id, + symbol_state.version_id_ + ); + } + } else { + ARCTICDB_DEBUG( + log::symbol(), "Problematic symbol/version pair: {}@{}: cannot be found", symbol, reference_id + ); + symbols.emplace_back(symbol, reference_id, reference_pair.second, ActionType::DELETE); + } + } + std::sort(std::begin(symbols), std::end(symbols), [](const auto& l, const auto& r) { + return l.stream_id_ < r.stream_id_; + }); +} + +/// Merges journal entries (JournalEntryData map) with existing compacted or version-key symbols. +/// The map is read-only so it can be moved into LoadResult for later batch deletion. +CollectionType merge_existing_with_journal_map( const std::shared_ptr& version_map, const std::shared_ptr& store, - const std::vector& keys, std::vector&& existing + const JournalMapType& update_map, std::vector&& existing ) { auto existing_keys = std::move(existing); - auto update_map = load_journal_keys(keys); CollectionType symbols; std::map> problematic_symbols; + std::unordered_set seen_in_existing; const auto min_allowed_interval = ConfigsMap::instance()->get_int("SymbolList.MinIntervalNs", 100'000'000LL); for (auto& previous_entry : existing_keys) { @@ -418,126 +486,85 @@ CollectionType merge_existing_with_journal_keys( ); } else { util::check(!updated->second.empty(), "Unexpected empty entry for symbol {}", updated->first); - if (auto problematic_entry = is_problematic(previous_entry, updated->second, min_allowed_interval); + seen_in_existing.insert(stream_id); + std::vector entries; + entries.reserve(updated->second.size()); + for (const auto& ck : updated->second) + entries.push_back(to_symbol_entry_data(ck)); + if (auto problematic_entry = is_problematic(previous_entry, entries, min_allowed_interval); problematic_entry) { problematic_symbols.try_emplace( stream_id, std::make_pair(problematic_entry.reference_id(), problematic_entry.time()) ); } else { - const auto& last_entry = updated->second.rbegin(); - symbols.emplace_back( - updated->first, last_entry->reference_id_, last_entry->timestamp_, last_entry->action_ - ); + const auto last = to_symbol_entry_data(updated->second.back()); + symbols.emplace_back(updated->first, last.reference_id_, last.timestamp_, last.action_); } - update_map.erase(updated); } } - for (const auto& [symbol, entries] : update_map) { - ARCTICDB_DEBUG(log::symbol(), "{} {}", symbol, entries); + for (const auto& [symbol, ck_entries] : update_map) { + if (seen_in_existing.count(symbol) > 0) + continue; + std::vector entries; + entries.reserve(ck_entries.size()); + for (const auto& ck : ck_entries) + entries.push_back(to_symbol_entry_data(ck)); if (auto problematic_entry = is_problematic(entries, min_allowed_interval); problematic_entry) { problematic_symbols.try_emplace(symbol, problematic_entry.reference_id(), problematic_entry.time()); } else { - const auto& last_entry = entries.rbegin(); - symbols.emplace_back(symbol, last_entry->reference_id_, last_entry->timestamp_, last_entry->action_); + const auto last = to_symbol_entry_data(ck_entries.back()); + symbols.emplace_back(symbol, last.reference_id_, last.timestamp_, last.action_); } } - if (!problematic_symbols.empty()) { - auto symbol_versions = std::make_shared>(); - for (const auto& [symbol, reference_pair] : problematic_symbols) - symbol_versions->emplace_back(symbol); - - auto versions = batch_check_latest_id_and_status(store, version_map, symbol_versions); - - for (const auto& [symbol, reference_pair] : problematic_symbols) { - auto reference_id = reference_pair.first; - - if (auto version = versions->find(symbol); version != versions->end()) { - const auto& symbol_state = version->second; - if (symbol_state.exists_) { - ARCTICDB_DEBUG( - log::symbol(), - "Problematic symbol/version pair: {}@{}: exists at id {}", - symbol, - reference_id, - symbol_state.version_id_ - ); - symbols.emplace_back(symbol, symbol_state.version_id_, symbol_state.timestamp_, ActionType::ADD); - } else { - symbols.emplace_back(symbol, symbol_state.version_id_, symbol_state.timestamp_, ActionType::DELETE); - ARCTICDB_DEBUG( - log::symbol(), - "Problematic symbol/version pair: {}@{}: deleted at id {}", - symbol, - reference_id, - symbol_state.version_id_ - ); - } - } else { - ARCTICDB_DEBUG( - log::symbol(), "Problematic symbol/version pair: {}@{}: cannot be found", symbol, reference_id - ); - symbols.emplace_back(symbol, reference_id, reference_pair.second, ActionType::DELETE); - } - } - std::sort(std::begin(symbols), std::end(symbols), [](const auto& l, const auto& r) { - return l.stream_id_ < r.stream_id_; - }); - } - + resolve_problematic_symbols(version_map, store, problematic_symbols, symbols); return symbols; } -CollectionType load_from_symbol_list_keys( - const std::shared_ptr& version_map, const std::shared_ptr& store, - const std::vector& keys, const Compaction& compaction -) { - ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Loading symbols from symbol list keys"); - - auto previous_compaction = read_from_storage(store, *compaction); - return merge_existing_with_journal_keys(version_map, store, keys, std::move(previous_compaction)); -} - -CollectionType load_from_version_keys( - const std::shared_ptr& version_map, const std::shared_ptr& store, - const std::vector& keys, SymbolListData& data, WillAttemptCompaction will_attempt_compaction -) { - ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Loading symbols from version keys"); - auto previous_entries = load_previous_from_version_keys(store, data, will_attempt_compaction); - return merge_existing_with_journal_keys(version_map, store, keys, std::move(previous_entries)); -} - LoadResult attempt_load( const std::shared_ptr& version_map, const std::shared_ptr& store, SymbolListData& data, WillAttemptCompaction will_attempt_compaction ) { ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Symbol list load attempt"); + const bool will_compact = will_attempt_compaction == WillAttemptCompaction::YES; + auto journal = load_journal_streaming(store, data, will_attempt_compaction); + LoadResult load_result; - load_result.symbol_list_keys_ = get_all_symbol_list_keys(store, data, will_attempt_compaction); - load_result.maybe_previous_compaction = last_compaction(load_result.symbol_list_keys_); + load_result.compaction_key_ = journal.compaction_key; + load_result.total_key_count_ = journal.total_key_count; + + if (journal.compaction_key) { + ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Loading symbols from symbol list keys"); + auto existing = read_from_storage(store, *journal.compaction_key); + if (journal.update_map.empty()) { + load_result.symbols_ = + CollectionType(std::make_move_iterator(existing.begin()), std::make_move_iterator(existing.end())); + } else { + load_result.symbols_ = + merge_existing_with_journal_map(version_map, store, journal.update_map, std::move(existing)); + } + } else { + ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Loading symbols from version keys"); + auto previous_entries = load_previous_from_version_keys(store, data, will_attempt_compaction); + load_result.symbols_ = + merge_existing_with_journal_map(version_map, store, journal.update_map, std::move(previous_entries)); + if (will_compact) { + // Verify every journal symbol we'd delete corresponds to a symbol in the merged output. + // Guards against silent data loss from merge bugs. + std::unordered_set symbols_in_merge; + for (const auto& entry : load_result.symbols_) + symbols_in_merge.emplace(entry.stream_id_); + for (const auto& [symbol, _] : journal.update_map) + util::check(symbols_in_merge.count(symbol) > 0, "Would delete unseen symbol {}", symbol); + } + } - if (load_result.maybe_previous_compaction) - load_result.symbols_ = load_from_symbol_list_keys( - version_map, store, load_result.symbol_list_keys_, *load_result.maybe_previous_compaction - ); - else { - load_result.symbols_ = load_from_version_keys( - version_map, store, load_result.symbol_list_keys_, data, will_attempt_compaction - ); - std::unordered_set keys_in_versions; - for (const auto& entry : load_result.symbols_) - keys_in_versions.emplace(entry.stream_id_); - - for (const auto& key : load_result.symbol_list_keys_) - util::check( - keys_in_versions.find(StreamId{std::get(key.start_index())}) != keys_in_versions.end(), - "Would delete unseen key {}", - key - ); + if (will_compact) { + load_result.old_compaction_keys_ = std::move(journal.compaction_keys); + load_result.update_map_ = std::move(journal.update_map); } - load_result.timestamp_ = store->current_timestamp(); return load_result; } @@ -624,12 +651,12 @@ StreamDescriptor delete_symbol_stream_descriptor(const StreamId& stream_id, cons } bool SymbolList::needs_compaction(const LoadResult& load_result) const { - if (!load_result.maybe_previous_compaction) { + if (!load_result.compaction_key_) { log::version().debug("Symbol list: needs_compaction=[true] as no previous compaction"); return true; } - auto n_keys = static_cast(load_result.symbol_list_keys_.size()); + auto n_keys = static_cast(load_result.total_key_count_); if (auto fixed = ConfigsMap::instance()->get_int("SymbolList.MaxDelta")) { auto result = n_keys > *fixed; log::version().debug( @@ -744,7 +771,7 @@ SegmentInMemory create_empty_segment(const StreamId& stream_id) { } VariantKey write_symbols( - const std::shared_ptr& store, const CollectionType& symbols, const StreamId& stream_id, + const std::shared_ptr& store, CollectionType symbols, const StreamId& stream_id, const StreamId& type_holder ) { ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Writing {} symbols to symbol list cache", symbols.size()); @@ -772,26 +799,23 @@ void delete_keys(const std::shared_ptr& store, std::vector&& rem // Corner case: if the newly written Compaction key (exclude) has the same timestamp as an existing one // (e.g. when a previous compaction round failed in the deletion step), we don't want to delete the former if (atom_key != exclude) - variant_keys.emplace_back(atom_key); + variant_keys.emplace_back(std::move(atom_key)); } store->remove_keys_sync(variant_keys); } -bool has_recent_compaction( - const std::shared_ptr& store, - const std::optional::const_iterator>& maybe_previous_compaction -) { +bool has_recent_compaction(const std::shared_ptr& store, const std::optional& compaction_key) { bool found_last = false; bool has_newer = false; - if (maybe_previous_compaction.has_value()) { - // Symbol list keys source + if (compaction_key) { + // We found a compaction key during load. Re-scan to check two things: + // 1. The key we saw still exists (another process may have already replaced it). + // 2. A newer compaction key has appeared since we loaded (another process compacted concurrently). store->iterate_type( KeyType::SYMBOL_LIST, - [&found_last, - &has_newer, - &last_compaction_key = *maybe_previous_compaction.value()](const VariantKey& key) { + [&found_last, &has_newer, &last_compaction_key = *compaction_key](const VariantKey& key) { const auto& atom = to_atom(key); if (atom == last_compaction_key) found_last = true; @@ -801,7 +825,8 @@ bool has_recent_compaction( std::get(compaction_id) ); } else { - // Version keys source + // No compaction key was present during load. Check whether one has appeared since + // (another process may have compacted while we were loading). store->iterate_type( KeyType::SYMBOL_LIST, [&has_newer](const VariantKey&) { has_newer = true; }, @@ -809,7 +834,8 @@ bool has_recent_compaction( ); } - return (maybe_previous_compaction && !found_last) || has_newer; + // Abort our compaction if our key was replaced (!found_last) or a newer one exists (has_newer). + return (compaction_key && !found_last) || has_newer; } size_t SymbolList::compact(const std::shared_ptr& store) { @@ -817,7 +843,7 @@ size_t SymbolList::compact(const std::shared_ptr& store) { LoadResult load_result = ExponentialBackoff(100, 2000).go([this, &version_map, &store]() { return attempt_load(version_map, store, data_, WillAttemptCompaction::YES); }); - auto num_symbol_list_keys = load_result.symbol_list_keys_.size(); + auto num_symbol_list_keys = load_result.total_key_count_; ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Forcing compaction. Obtaining lock..."); StorageLock lock{StringId{CompactionLockName}}; @@ -830,7 +856,7 @@ size_t SymbolList::compact(const std::shared_ptr& store) { } void SymbolList::compact_internal(const std::shared_ptr& store, LoadResult& load_result) const { - if (has_recent_compaction(store, load_result.maybe_previous_compaction)) { + if (has_recent_compaction(store, load_result.compaction_key_)) { // legacy arcticc symbol list entries don't get correctly listed when doing `iterate_type`, so can mess // up racing symbol list compaction detection. ARCTICDB_RUNTIME_DEBUG( @@ -838,10 +864,44 @@ void SymbolList::compact_internal(const std::shared_ptr& store, LoadResul "Symbol list compaction will be skipped: either a concurrent compaction was detected " "or there are legacy arcticc symbol list entries that cannot be verified." ); - } else { - auto written = write_symbols(store, load_result.symbols_, compaction_id, data_.type_holder_); - delete_keys(store, load_result.detach_symbol_list_keys(), std::get(written)); + return; + } + + auto written = write_symbols(store, std::move(load_result.symbols_), compaction_id, data_.type_holder_); + const auto& written_key = std::get(written); + + // Delete old compaction keys (typically 0–1 entries; exclude the newly written one). + auto& old_ck = load_result.old_compaction_keys_; + old_ck.erase( + std::remove_if( + old_ck.begin(), + old_ck.end(), + [&written_key](const VariantKey& vk) { return to_atom(vk) == written_key; } + ), + old_ck.end() + ); + if (!old_ck.empty()) + store->remove_keys_sync(std::move(old_ck)); + + // Reconstruct and delete journal keys in batches from the JournalMapType, freeing each symbol's + // entries as they are processed. Journal keys can never equal written_key (different id field). + static constexpr size_t kBatchSize = 10'000; + std::vector batch; + batch.reserve(kBatchSize); + for (auto it = load_result.update_map_.begin(); it != load_result.update_map_.end();) { + const auto& [symbol, ck_entries] = *it; + for (const auto& ck : ck_entries) { + batch.emplace_back(atom_key_from_journal_entry(symbol, ck)); + if (batch.size() == kBatchSize) { + store->remove_keys_sync(std::move(batch)); + batch.clear(); + batch.reserve(kBatchSize); + } + } + it = load_result.update_map_.erase(it); } + if (!batch.empty()) + store->remove_keys_sync(std::move(batch)); } } // namespace arcticdb diff --git a/cpp/arcticdb/version/symbol_list.hpp b/cpp/arcticdb/version/symbol_list.hpp index 723b6e73c40..a03dbed7dcd 100644 --- a/cpp/arcticdb/version/symbol_list.hpp +++ b/cpp/arcticdb/version/symbol_list.hpp @@ -17,12 +17,23 @@ #include namespace arcticdb { -struct SymbolListEntry; -struct SymbolEntryData; +enum class ActionType : uint8_t { ADD, DELETE }; + +// Compact 32-byte representation of a SYMBOL_LIST journal AtomKey. +// Stores only the fields needed to reconstruct the full AtomKey for deletion. +// The symbol (start_index / map key) is held separately by the containing JournalMapType. +struct JournalEntryData { + entity::VersionId key_version_id; // 8 bytes + timestamp creation_ts; // 8 bytes + entity::ContentHash content_hash; // 8 bytes + ActionType action; // 1 byte + bool is_new_style; // 1 byte + // 6 bytes implicit padding +}; + +using JournalMapType = std::unordered_map>; -using MapType = std::unordered_map>; -using Compaction = std::vector::const_iterator; -using MaybeCompaction = std::optional; +struct SymbolListEntry; using CollectionType = std::vector; enum class WillAttemptCompaction : uint8_t { @@ -31,13 +42,21 @@ enum class WillAttemptCompaction : uint8_t { NO_DISABLED // compaction explicitly disabled by caller }; +struct JournalResult { + std::optional compaction_key; + JournalMapType update_map; // JournalEntryData (32B/entry) for all journal entries + size_t total_key_count = 0; + std::vector compaction_keys; // VariantKeys of all compaction keys found during scan +}; + struct LoadResult { - std::vector symbol_list_keys_; - MaybeCompaction maybe_previous_compaction; + std::optional compaction_key_; CollectionType symbols_; - timestamp timestamp_ = 0L; - - std::vector&& detach_symbol_list_keys() { return std::move(symbol_list_keys_); } + size_t total_key_count_ = 0; + // Old compaction VariantKeys (typically 0–1); freed immediately after compact_internal deletes them. + std::vector old_compaction_keys_; + // Journal keys stored as JournalEntryData (32 B each); reconstructed in batches during compact_internal. + JournalMapType update_map_; }; struct SymbolListData { @@ -58,8 +77,6 @@ constexpr std::string_view DeleteSymbol = "__delete__"; constexpr VersionId unknown_version_id = std::numeric_limits::max(); -enum class ActionType : uint8_t { ADD, DELETE }; - inline StreamId action_id(ActionType action) { switch (action) { case ActionType::ADD: @@ -160,6 +177,13 @@ class SymbolList { } ); + // Build output before compaction — compact_internal moves symbols_ into write_symbols + R output; + for (const auto& entry : load_result.symbols_) { + if (entry.action_ == ActionType::ADD) + output.insert(entry.stream_id_); + } + if (will_attempt_compaction == WillAttemptCompaction::YES && needs_compaction(load_result)) { ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Compaction necessary. Obtaining lock..."); try { @@ -180,12 +204,6 @@ class SymbolList { } } - R output; - for (const auto& entry : load_result.symbols_) { - if (entry.action_ == ActionType::ADD) - output.insert(entry.stream_id_); - } - return output; } diff --git a/cpp/arcticdb/version/test/benchmark_symbol_list.cpp b/cpp/arcticdb/version/test/benchmark_symbol_list.cpp new file mode 100644 index 00000000000..4ddee167a43 --- /dev/null +++ b/cpp/arcticdb/version/test/benchmark_symbol_list.cpp @@ -0,0 +1,328 @@ +/* Copyright 2026 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software + * will be governed by the Apache License, version 2.0. + */ + +#include +#include +#include + +#ifdef __linux__ +#include +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace arcticdb; + +std::shared_ptr create_s3_mock_store(const std::string& lib_name = "benchmark_sl") { + storage::s3::S3ApiInstance::instance(); + arcticdb::proto::storage::VariantStorage vs; + arcticdb::proto::s3_storage::Config cfg; + cfg.set_bucket_name("test-bucket"); + cfg.set_use_mock_storage_for_testing(true); + util::pack_to_any(cfg, *vs.mutable_config()); + + storage::LibraryPath path{lib_name.c_str(), "store"}; + auto storages = storage::create_storages(path, storage::OpenMode::DELETE, {vs}); + auto library = std::make_shared(path, std::move(storages)); + return std::make_shared>( + async::AsyncStore(library, codec::default_lz4_codec(), EncodingVersion::V1) + ); +} + +// Tracks peak heap usage by polling mallinfo2 from a background thread. +// Only functional on Linux with glibc; on other platforms it reports zero. +struct PeakHeapTracker { + std::atomic running_{false}; + std::atomic peak_uordblks_{0}; + size_t baseline_{0}; + std::thread sampler_; + + static size_t current_heap_bytes() { +#ifdef __linux__ +#if __GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 33) + return mallinfo2().uordblks; +#else + return static_cast(mallinfo().uordblks); +#endif +#else + return 0; +#endif + } + + void start() { +#ifdef __linux__ + malloc_trim(0); +#endif + baseline_ = current_heap_bytes(); + peak_uordblks_ = baseline_; + running_ = true; + sampler_ = std::thread([this] { + while (running_.load(std::memory_order_relaxed)) { + auto current = current_heap_bytes(); + auto prev = peak_uordblks_.load(std::memory_order_relaxed); + while (current > prev && !peak_uordblks_.compare_exchange_weak(prev, current, std::memory_order_relaxed) + ) + ; + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + }); + } + + size_t stop() { + running_ = false; + if (sampler_.joinable()) + sampler_.join(); + auto current = current_heap_bytes(); + auto prev = peak_uordblks_.load(std::memory_order_relaxed); + if (current > prev) + peak_uordblks_.store(current, std::memory_order_relaxed); + return peak_uordblks_.load() - baseline_; + } +}; + +// Benchmarks peak memory usage of symbol list loading via the compaction path. +// +// Run with: --benchmark_time_unit=ms --benchmark_filter=BM_symbol_list_load + +static void BM_symbol_list_load(benchmark::State& state) { + auto store = std::make_shared(); + auto version_map = std::make_shared(); + auto num_symbols = state.range(0); + + // Write journal entries for num_symbols symbols + for (int64_t i = 0; i < num_symbols; ++i) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, 0); + } + + // Force compaction on first load to create the compacted segment + ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 0); + { + SymbolList sl{version_map}; + sl.load>(version_map, store, false); + } + + // Add a few journal entries to exercise the merge path + for (int64_t i = 0; i < 100; ++i) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, 1); + } + + // Disable compaction during the benchmark iterations + ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); + + for (auto _ : state) { + PeakHeapTracker tracker; + tracker.start(); + + SymbolList sl{version_map}; + auto result = sl.load>(version_map, store, true); + + auto peak_delta = tracker.stop(); + auto retained = PeakHeapTracker::current_heap_bytes() - tracker.baseline_; + + state.counters["PeakMB"] = benchmark::Counter(static_cast(peak_delta) / (1024.0 * 1024.0)); + state.counters["RetainedMB"] = benchmark::Counter(static_cast(retained) / (1024.0 * 1024.0)); + state.counters["NumSymbols"] = benchmark::Counter(static_cast(result.size())); + + benchmark::DoNotOptimize(result); + } +} + +BENCHMARK(BM_symbol_list_load)->Arg(100'000)->Arg(300'000)->Unit(benchmark::kMillisecond)->Iterations(3); +BENCHMARK(BM_symbol_list_load)->Arg(1'000'000)->Unit(benchmark::kMillisecond)->Iterations(3); + +// Benchmarks peak memory with many uncompacted journal entries per symbol. +// This is the scenario where the AtomKey vector elimination matters most. +// +// Setup: N_SYMBOLS symbols, each with ENTRIES_PER_SYMBOL journal entries. + +static void BM_symbol_list_load_many_entries(benchmark::State& state) { + auto store = std::make_shared(); + auto version_map = std::make_shared(); + auto num_symbols = state.range(0); + auto entries_per_symbol = state.range(1); + + // Write journal entries: each symbol gets multiple versions + for (int64_t i = 0; i < num_symbols; ++i) { + for (int64_t v = 0; v < entries_per_symbol; ++v) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, v); + } + } + + // Force compaction to create the compacted segment + ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 0); + { + SymbolList sl{version_map}; + sl.load>(version_map, store, false); + } + + // Write more journal entries (uncompacted) + for (int64_t i = 0; i < num_symbols; ++i) { + for (int64_t v = 0; v < entries_per_symbol; ++v) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, entries_per_symbol + v); + } + } + + ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); + + for (auto _ : state) { + PeakHeapTracker tracker; + tracker.start(); + + SymbolList sl{version_map}; + auto result = sl.load>(version_map, store, true); + + auto peak_delta = tracker.stop(); + auto retained = PeakHeapTracker::current_heap_bytes() - tracker.baseline_; + + state.counters["PeakMB"] = benchmark::Counter(static_cast(peak_delta) / (1024.0 * 1024.0)); + state.counters["RetainedMB"] = benchmark::Counter(static_cast(retained) / (1024.0 * 1024.0)); + state.counters["NumSymbols"] = benchmark::Counter(static_cast(result.size())); + state.counters["TotalEntries"] = benchmark::Counter(static_cast(num_symbols * entries_per_symbol)); + + benchmark::DoNotOptimize(result); + } +} + +BENCHMARK(BM_symbol_list_load_many_entries)->Args({1'000, 1'000})->Unit(benchmark::kMillisecond)->Iterations(1); +// Disabled in CI: 300K-symbol setup writes 600K journal entries (~25 s on CI hardware). +// Run locally: make bench-cpp FILTER=BM_symbol_list_load_many_entries/300000 +// BENCHMARK(BM_symbol_list_load_many_entries)->Args({300'000, 1})->Unit(benchmark::kMillisecond)->Iterations(1); + +// Benchmarks peak memory during a load that triggers compaction. +// This measures the compaction path where keys are collected for deletion. +// +// Setup: N_SYMBOLS symbols with ENTRIES_PER_SYMBOL uncompacted journal entries each. +// MaxDelta=0 forces compaction on every load. + +static void BM_symbol_list_compaction(benchmark::State& state) { + auto version_map = std::make_shared(); + auto num_symbols = state.range(0); + auto entries_per_symbol = state.range(1); + + // Setup outside the benchmark loop — write journal entries once + auto store = std::make_shared(); + for (int64_t i = 0; i < num_symbols; ++i) { + for (int64_t v = 0; v < entries_per_symbol; ++v) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, v); + } + } + + ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 0); + + for (auto _ : state) { + PeakHeapTracker tracker; + tracker.start(); + + SymbolList sl{version_map}; + auto result = sl.load>(version_map, store, false); + + auto peak_delta = tracker.stop(); + + state.counters["PeakMB"] = benchmark::Counter(static_cast(peak_delta) / (1024.0 * 1024.0)); + state.counters["NumSymbols"] = benchmark::Counter(static_cast(result.size())); + state.counters["TotalEntries"] = benchmark::Counter(static_cast(num_symbols * entries_per_symbol)); + + benchmark::DoNotOptimize(result); + } + + ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); +} + +BENCHMARK(BM_symbol_list_compaction) + ->Args({100, 100}) + // ->Args({10'000, 10}) + // ->Args({1'000, 100}) + // ->Args({1'000, 1'000}) + // ->Args({10'000, 100}) + ->Unit(benchmark::kMillisecond) + ->Iterations(1); + +// ---- S3 mock variants: exercise the full S3 storage layer (serialization, key parsing) ---- + +static void BM_symbol_list_load_s3(benchmark::State& state) { + auto store = create_s3_mock_store(); + auto version_map = std::make_shared(); + auto num_symbols = state.range(0); + + for (int64_t i = 0; i < num_symbols; ++i) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, 0); + } + + ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 0); + { + SymbolList sl{version_map}; + sl.load>(version_map, store, false); + } + + for (int64_t i = 0; i < 100; ++i) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, 1); + } + + ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); + + for (auto _ : state) { + PeakHeapTracker tracker; + tracker.start(); + + SymbolList sl{version_map}; + auto result = sl.load>(version_map, store, true); + + auto peak_delta = tracker.stop(); + + state.counters["PeakMB"] = benchmark::Counter(static_cast(peak_delta) / (1024.0 * 1024.0)); + state.counters["NumSymbols"] = benchmark::Counter(static_cast(result.size())); + + benchmark::DoNotOptimize(result); + } +} + +BENCHMARK(BM_symbol_list_load_s3)->Arg(10'000)->Unit(benchmark::kMillisecond)->Iterations(1); + +[[maybe_unused]] static void BM_symbol_list_compaction_s3(benchmark::State& state) { + auto version_map = std::make_shared(); + auto num_symbols = state.range(0); + auto entries_per_symbol = state.range(1); + + auto store = create_s3_mock_store(); + for (int64_t i = 0; i < num_symbols; ++i) { + for (int64_t v = 0; v < entries_per_symbol; ++v) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, v); + } + } + + ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 0); + + for (auto _ : state) { + PeakHeapTracker tracker; + tracker.start(); + + SymbolList sl{version_map}; + auto result = sl.load>(version_map, store, false); + + auto peak_delta = tracker.stop(); + + state.counters["PeakMB"] = benchmark::Counter(static_cast(peak_delta) / (1024.0 * 1024.0)); + state.counters["NumSymbols"] = benchmark::Counter(static_cast(result.size())); + state.counters["TotalEntries"] = benchmark::Counter(static_cast(num_symbols * entries_per_symbol)); + + benchmark::DoNotOptimize(result); + } + + ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); +} + +BENCHMARK(BM_symbol_list_compaction_s3)->Args({1'000, 100})->Unit(benchmark::kMillisecond)->Iterations(1); diff --git a/cpp/arcticdb/version/test/test_symbol_list.cpp b/cpp/arcticdb/version/test/test_symbol_list.cpp index f377a2e6cd4..c632903205d 100644 --- a/cpp/arcticdb/version/test/test_symbol_list.cpp +++ b/cpp/arcticdb/version/test/test_symbol_list.cpp @@ -757,8 +757,14 @@ TEST_F(SymbolListSuite, BackwardsCompat) { auto old_keys = backwards_compat_get_all_symbol_list_keys(store); auto old_symbols = backwards_compat_get_symbols(store); + std::set old_key_set(old_keys.begin(), old_keys.end()); backwards_compat_compact(store, std::move(old_keys), old_symbols); + // delete_keys must remove every old journal/compaction key, leaving only the freshly written compaction key + auto keys_after_compact = backwards_compat_get_all_symbol_list_keys(store); + ASSERT_EQ(keys_after_compact.size(), 1); + ASSERT_EQ(old_key_set.count(keys_after_compact.front()), 0); + ASSERT_EQ(all_symbols_match(store, symbol_list, expected), true); for (auto i = 0U; i < 10; i += 2) { @@ -1057,3 +1063,95 @@ TEST_P(SymbolListRace, Run) { INSTANTIATE_TEST_SUITE_P(SymbolListSource, SymbolListRace, Combine(Values('S'), Bool(), Bool(), Bool())); // For version keys source (initial compaction), there's no old compaction key to remove: INSTANTIATE_TEST_SUITE_P(VersionKeysSource, SymbolListRace, Combine(Values('V'), Values(false), Bool(), Bool())); + +TEST_F(SymbolListSuite, DirectPathEquivalence) { + // Setup: add symbols and force compaction + for (int i = 0; i < 50; ++i) { + auto symbol = fmt::format("sym_{}", i); + SymbolList::add_symbol(store_, StreamId{symbol}, 0); + auto key = atom_key_builder().build(symbol, KeyType::TABLE_INDEX); + version_map_->write_version(store_, key, std::nullopt); + } + + ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 0); + { + SymbolList sl{version_map_}; + sl.load>(version_map_, store_, false); + } + + // Add journal entries: new symbols + deletes of existing ones + for (int i = 40; i < 60; ++i) { + auto symbol = fmt::format("sym_{}", i); + SymbolList::add_symbol(store_, StreamId{symbol}, 1); + auto key = atom_key_builder().version_id(1).build(symbol, KeyType::TABLE_INDEX); + version_map_->write_version(store_, key, std::nullopt); + } + for (int i = 0; i < 10; ++i) { + auto symbol = fmt::format("sym_{}", i); + SymbolList::remove_symbol(store_, StreamId{symbol}, 1); + } + + ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); + + // Load via compaction-eligible path (will not actually compact since threshold is default) + SymbolList sl1{version_map_}; + auto compaction_result = sl1.load>(version_map_, store_, false); + + // Load via direct path (no_compaction=true) + SymbolList sl2{version_map_}; + auto direct_result = sl2.load>(version_map_, store_, true); + + EXPECT_EQ(compaction_result, direct_result); + // 50 original + 10 new = 60. The remove_symbol journal entries are resolved + // as ADD because the version map still shows those symbols as existing. + EXPECT_EQ(direct_result.size(), 60u); +} + +TEST_F(SymbolListSuite, DirectPathEquivalenceWithTrueDeletes) { + // Setup: add symbols and force compaction + for (int i = 0; i < 50; ++i) { + auto symbol = fmt::format("sym_{}", i); + SymbolList::add_symbol(store_, StreamId{symbol}, 0); + auto key = atom_key_builder().build(symbol, KeyType::TABLE_INDEX); + version_map_->write_version(store_, key, std::nullopt); + } + + ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 0); + { + SymbolList sl{version_map_}; + sl.load>(version_map_, store_, false); + } + + // Add new symbols + for (int i = 50; i < 60; ++i) { + auto symbol = fmt::format("sym_{}", i); + SymbolList::add_symbol(store_, StreamId{symbol}, 1); + auto key = atom_key_builder().version_id(1).build(symbol, KeyType::TABLE_INDEX); + version_map_->write_version(store_, key, std::nullopt); + } + + // Delete symbols 0-9 via both the symbol list journal AND the version map + for (int i = 0; i < 10; ++i) { + auto symbol = fmt::format("sym_{}", i); + SymbolList::remove_symbol(store_, StreamId{symbol}, 1); + version_map_->tombstone_from_key_or_all(store_, StreamId{symbol}); + } + + ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); + + SymbolList sl1{version_map_}; + auto compaction_result = sl1.load>(version_map_, store_, false); + + SymbolList sl2{version_map_}; + auto direct_result = sl2.load>(version_map_, store_, true); + + EXPECT_EQ(compaction_result, direct_result); + // 50 original - 10 deleted + 10 new = 50 + EXPECT_EQ(direct_result.size(), 50u); + // Verify deleted symbols are absent + for (int i = 0; i < 10; ++i) + EXPECT_EQ(direct_result.count(StreamId{fmt::format("sym_{}", i)}), 0u); + // Verify new symbols are present + for (int i = 50; i < 60; ++i) + EXPECT_EQ(direct_result.count(StreamId{fmt::format("sym_{}", i)}), 1u); +} diff --git a/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py b/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py index ffe55218090..29b58fb7116 100644 --- a/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py +++ b/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py @@ -8,7 +8,7 @@ import numpy as np import arcticdb from arcticdb import QueryBuilder, LibraryOptions -from arcticdb.flattener import Flattener +from arcticdb.flattener import Flattener, DEFAULT_RECURSE_LIMIT from arcticdb.version_store._custom_normalizers import ( CustomNormalizer, register_normalizer, @@ -392,12 +392,15 @@ def test_deep_nesting_metastruct_size_over_limit(lmdb_version_store_v1, all_recu key = "reasonable_length_key" data = {key: pd.DataFrame({"col": [0]})} - nesting_levels = 256 + max_levels = DEFAULT_RECURSE_LIMIT // 2 + nesting_levels = max_levels + 1 for i in range(nesting_levels - 1): data[key] = {key: data[key]} # When & Then - with pytest.raises(DataTooNestedException, match=r"^Symbol sym cannot be recursively normalized.*255 levels.*"): + with pytest.raises( + DataTooNestedException, match=rf"^Symbol sym cannot be recursively normalized.*{max_levels} levels.*" + ): lib.write(sym, data, recursive_normalizers=True) @@ -408,7 +411,7 @@ def test_deep_nesting_metastruct_size_under_limit(lmdb_version_store_v1, all_rec key = "reasonable_length_key" data = {key: pd.DataFrame({"col": [0]})} - nesting_levels = 255 + nesting_levels = DEFAULT_RECURSE_LIMIT // 2 for i in range(nesting_levels - 1): data[key] = {key: data[key]}