diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index 1a1b9de91e5..c4326938ad2 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -36,6 +36,87 @@ bool is_valid_merge_strategy(const arcticdb::MergeStrategy& strategy) { }; return std::ranges::find(valid_strategies, strategy) != std::ranges::end(valid_strategies); } +template +auto retry_read_on_concurrent_prune( + const std::shared_ptr& store, const std::shared_ptr& version_map, const StreamId& stream_id, + const VersionQuery& version_query, Fn&& read_fn +) { + // Retries are only useful for latest-version reads (std::monostate): for pinned queries the + // target version is gone and re-resolution would silently return a different version. + if (!std::holds_alternative(version_query.content_)) + return read_fn(); + try { + return read_fn(); + } catch (const storage::NoDataFoundException&) { + if (!version_map->invalidate_if_version_ref_changed(store, stream_id)) + throw; + } catch (const storage::KeyNotFoundException&) { + if (!version_map->invalidate_if_version_ref_changed(store, stream_id)) + throw; + } + log::version().info("Read of symbol '{}' raced with a concurrent prune; version ref changed, retrying", stream_id); + return read_fn(); +} + +// Multi-symbol variant for batch_read_and_join, where one failed symbol read fails the whole +// operation (so there is no per-symbol Try to retry). Otherwise identical to the single-symbol +// overload: retries once if any latest-version symbol's version ref changed. read_fn must run on the +// caller thread (it blocks on nested folly futures). +template +auto retry_read_on_concurrent_prune( + const std::shared_ptr& store, const std::shared_ptr& version_map, + const std::vector& stream_ids, const std::vector& version_queries, Fn&& read_fn +) { + auto invalidate_any_latest = [&] { + bool any = false; + for (auto i = 0UL; i < stream_ids.size(); ++i) + if (std::holds_alternative(version_queries[i].content_)) + any |= version_map->invalidate_if_version_ref_changed(store, stream_ids[i]); + return any; + }; + if (std::none_of(version_queries.begin(), version_queries.end(), [](const VersionQuery& vq) { + return std::holds_alternative(vq.content_); + })) + return read_fn(); + try { + return read_fn(); + } catch (const storage::NoDataFoundException&) { + if (!invalidate_any_latest()) + throw; + } catch (const storage::KeyNotFoundException&) { + if (!invalidate_any_latest()) + throw; + } + log::version().info("Batch read raced with a concurrent prune; a version ref changed, retrying"); + return read_fn(); +} + +// For the post-collectAll stage of batch reads. For each result that is a retryable latest-version +// race (failed with a missing-key error on a latest-version query whose ref has since changed), it +// invalidates the cache, logs, and re-runs retry_fn(idx) — which performs the read synchronously and +// returns T — capturing its value or exception back into the result. All other results are left +// untouched. +// retry_fn(idx) MUST run on the caller thread, not inside a future continuation: it blocks on nested +// folly futures, which would deadlock the shared thread pools if called from an executor thread. +template +void retry_failed_reads_on_concurrent_prune( + const std::shared_ptr& store, const std::shared_ptr& version_map, + const std::vector& stream_ids, const std::vector& version_queries, + std::vector>& results, Fn&& retry_fn +) { + for (auto idx = 0UL; idx < results.size(); ++idx) { + auto& t = results[idx]; + if (t.hasValue() || !std::holds_alternative(version_queries[idx].content_) || + (!t.template hasException() && + !t.template hasException()) || + !version_map->invalidate_if_version_ref_changed(store, stream_ids[idx])) + continue; + log::version().info( + "Read of symbol '{}' raced with a concurrent prune; version ref changed, retrying", stream_ids[idx] + ); + results[idx] = folly::makeTryWith([&] { return retry_fn(idx); }); + } +} } // namespace namespace arcticdb::version_store { @@ -220,14 +301,16 @@ FrameAndDescriptor LocalVersionedEngine::read_column_stats_internal(const Versio ReadVersionOutput LocalVersionedEngine::read_column_stats_version_internal( const StreamId& stream_id, const VersionQuery& version_query ) { - auto versioned_item = get_version_to_read(stream_id, version_query); - missing_data::check( - versioned_item.has_value(), - "read_column_stats_version_internal: version not found for stream '{}'", - stream_id - ); - auto frame_and_descriptor = read_column_stats_internal(versioned_item.value()); - return ReadVersionOutput{std::move(versioned_item.value()), std::move(frame_and_descriptor)}; + return retry_read_on_concurrent_prune(store(), version_map(), stream_id, version_query, [&] { + auto versioned_item = get_version_to_read(stream_id, version_query); + missing_data::check( + versioned_item.has_value(), + "read_column_stats_version_internal: version not found for stream '{}'", + stream_id + ); + auto frame_and_descriptor = read_column_stats_internal(versioned_item.value()); + return ReadVersionOutput{std::move(versioned_item.value()), std::move(frame_and_descriptor)}; + }); } ColumnStats LocalVersionedEngine::get_column_stats_info_internal(const VersionedItem& versioned_item) { @@ -237,13 +320,15 @@ ColumnStats LocalVersionedEngine::get_column_stats_info_internal(const Versioned ColumnStats LocalVersionedEngine::get_column_stats_info_version_internal( const StreamId& stream_id, const VersionQuery& version_query ) { - auto versioned_item = get_version_to_read(stream_id, version_query); - missing_data::check( - versioned_item.has_value(), - "get_column_stats_info_version_internal: version not found for stream '{}'", - stream_id - ); - return get_column_stats_info_internal(versioned_item.value()); + return retry_read_on_concurrent_prune(store(), version_map(), stream_id, version_query, [&] { + auto versioned_item = get_version_to_read(stream_id, version_query); + missing_data::check( + versioned_item.has_value(), + "get_column_stats_info_version_internal: version not found for stream '{}'", + stream_id + ); + return get_column_stats_info_internal(versioned_item.value()); + }); } std::set LocalVersionedEngine::list_streams_internal( @@ -406,11 +491,12 @@ std::optional LocalVersionedEngine::get_version_to_read( } IndexRange LocalVersionedEngine::get_index_range(const StreamId& stream_id, const VersionQuery& version_query) { - auto version = get_version_to_read(stream_id, version_query); - if (!version) - return unspecified_range(); - - return index::get_index_segment_range(version->key_, store()); + return retry_read_on_concurrent_prune(store(), version_map(), stream_id, version_query, [&] { + auto version = get_version_to_read(stream_id, version_query); + if (!version) + return unspecified_range(); + return index::get_index_segment_range(version->key_, store()); + }); } VersionIdentifier get_version_identifier( @@ -432,11 +518,10 @@ VersionIdentifier get_version_identifier( return *version; } -ReadVersionWithNodesOutput LocalVersionedEngine::read_dataframe_version_internal( +ReadVersionWithNodesOutput LocalVersionedEngine::read_one_dataframe_version( const StreamId& stream_id, const VersionQuery& version_query, const std::shared_ptr& read_query, const ReadOptions& read_options, std::shared_ptr handler_data ) { - py::gil_scoped_release release_gil; const auto identifier = util::variant_match( version_query.content_, [&](const std::shared_ptr& preloaded_index_query) -> VersionIdentifier { @@ -480,6 +565,24 @@ ReadVersionWithNodesOutput LocalVersionedEngine::read_dataframe_version_internal } } +ReadVersionWithNodesOutput LocalVersionedEngine::read_dataframe_version_internal( + const StreamId& stream_id, const VersionQuery& version_query, const std::shared_ptr& read_query, + const ReadOptions& read_options, std::shared_ptr handler_data +) { + py::gil_scoped_release release_gil; + // A concurrent writer using prune_previous_version without background deletion physically deletes + // a superseded version's keys. A reader can resolve a version just before it is pruned and then + // fail to fetch its (now deleted) index/data keys. Rather than surface that race, invalidate the + // cached version chain so we re-resolve to the current latest version, and retry. + // Retries are only useful for latest-version reads (std::monostate): for pinned queries + // (specific version, timestamp, snapshot) the target version is gone and re-resolution would + // silently return a different version. A genuinely missing version raises E_NO_SUCH_VERSION + // (not caught here) and still fails fast. + return retry_read_on_concurrent_prune(store(), version_map(), stream_id, version_query, [&] { + return read_one_dataframe_version(stream_id, version_query, read_query, read_options, handler_data); + }); +} + VersionedItem LocalVersionedEngine::read_modify_write_internal( const StreamId& source_stream, const StreamId& target_stream, const VersionQuery& version_query, const std::shared_ptr& read_query, const ReadOptions& read_options, bool prune_previous_versions, @@ -487,20 +590,25 @@ VersionedItem LocalVersionedEngine::read_modify_write_internal( ) { py::gil_scoped_release release_gil; - std::optional source_version = get_version_to_read(source_stream, version_query); - user_input::check( - source_version.has_value(), "Could not find requested version for symbol \"{}\"", source_stream - ); - const WriteOptions write_options = get_write_options(); auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), target_stream); const auto target_version = get_next_version_from_key(maybe_prev); - - VersionIdentifier resolved = - std::make_shared(read_index_key_without_column_stats(store(), source_version->key_)); - std::shared_ptr pipeline_context = - setup_pipeline_context(store(), resolved, *read_query, read_options); const IndexPartialKey target_partial_index_key{target_stream, target_version}; + + // Retry only the source read (pure reads — safe to retry). + // read_index_key_without_column_stats can throw if the source version's index key was + // pruned by a concurrent writer. setup_pipeline_context is unreachable on the failing + // attempt so read_query is never mutated before the retry. read_modify_write_impl + // (which writes) stays outside and is never replayed. + auto pipeline_context = retry_read_on_concurrent_prune(store(), version_map(), source_stream, version_query, [&] { + auto source_version = get_version_to_read(source_stream, version_query); + user_input::check( + source_version.has_value(), "Could not find requested version for symbol \"{}\"", source_stream + ); + VersionIdentifier resolved = + std::make_shared(read_index_key_without_column_stats(store(), source_version->key_)); + return setup_pipeline_context(store(), resolved, *read_query, read_options); + }); VersionedItem versioned_item = read_modify_write_impl( store(), read_query, @@ -606,14 +714,16 @@ DescriptorItem LocalVersionedEngine::read_descriptor_internal( const StreamId& stream_id, const VersionQuery& version_query, bool include_index_segment ) { ARCTICDB_SAMPLE(ReadDescriptor, 0) - auto version = get_version_to_read(stream_id, version_query); - missing_data::check( - version.has_value(), - "Unable to retrieve descriptor data. {}@{}: version not found", - stream_id, - version_query - ); - return get_descriptor(std::move(version->key_), include_index_segment).get(); + return retry_read_on_concurrent_prune(store(), version_map(), stream_id, version_query, [&] { + auto version = get_version_to_read(stream_id, version_query); + missing_data::check( + version.has_value(), + "Unable to retrieve descriptor data. {}@{}: version not found", + stream_id, + version_query + ); + return get_descriptor(std::move(version->key_), include_index_segment).get(); + }); } std::vector> LocalVersionedEngine::batch_read_descriptor_internal( @@ -628,6 +738,26 @@ std::vector> LocalVersionedEngine::batch ); } auto descriptors = folly::collectAll(descriptor_futures).get(); + + // A latest-version read can race a concurrent prune (see read_dataframe_version_internal). + retry_failed_reads_on_concurrent_prune( + store(), + version_map(), + stream_ids, + version_queries, + descriptors, + [&](size_t idx) -> DescriptorItem { + auto version = get_version_to_read(stream_ids[idx], version_queries[idx]); + missing_data::check( + version.has_value(), + "Unable to retrieve descriptor data. {}@{}: version not found", + stream_ids[idx], + version_queries[idx] + ); + return get_descriptor(std::move(version->key_), /*include_index_segment=*/false).get(); + } + ); + TransformBatchResultsFlags flags; flags.throw_on_error_ = batch_read_options.batch_throw_on_error(); return transform_batch_items_or_throw(std::move(descriptors), stream_ids, flags, version_queries); @@ -1518,6 +1648,24 @@ std::vector> LocalVersionedE ); } + // A latest-version read can race a concurrent prune (see read_dataframe_version_internal). + retry_failed_reads_on_concurrent_prune( + store(), + version_map(), + stream_ids, + version_queries, + all_results, + [&](size_t idx) -> ReadVersionWithNodesOutput { + return read_one_dataframe_version( + stream_ids[idx], + version_queries[idx], + read_queries.empty() ? std::make_shared() : read_queries[idx], + batch_read_options.at(idx), + handler_data + ); + } + ); + TransformBatchResultsFlags flags; flags.convert_no_data_found_to_key_not_found_ = true; flags.throw_on_error_ = batch_read_options.batch_throw_on_error(); @@ -1563,91 +1711,98 @@ MultiSymbolReadOutput LocalVersionedEngine::batch_read_and_join_internal( ) { py::gil_scoped_release release_gil; util::check(!clauses.empty(), "Cannot join with no joining clause provided"); - auto opt_index_key_futs = batch_get_versions_async(store(), version_map(), *stream_ids, *version_queries); - std::vector> symbol_processing_result_futs; - symbol_processing_result_futs.reserve(opt_index_key_futs.size()); - auto component_manager = std::make_shared(); - for (auto&& [idx, opt_index_key_fut] : folly::enumerate(opt_index_key_futs)) { - symbol_processing_result_futs.emplace_back( - std::move(opt_index_key_fut) - .thenValue([store = store(), - stream_ids, - version_queries, - read_query = - read_queries.empty() ? std::make_shared() : read_queries[idx], - idx, - read_options, - component_manager](std::optional&& opt_index_key) mutable { - auto version_info = get_version_identifier( - (*stream_ids)[idx], - (*version_queries)[idx], - read_options, - opt_index_key.has_value() - ? std::make_optional(std::move(*opt_index_key)) - : std::nullopt - ); - return read_and_process( - store, std::move(version_info), read_query, read_options, component_manager - ); - }) - ); - } - for (auto& clause : clauses) { - clause->set_component_manager(component_manager); - } + // Move clauses once — they cannot be moved twice. schedule_remaining_iterations erases processed + // clauses from *clauses_ptr in-place, so snapshot the full list and restore it on each attempt. auto clauses_ptr = std::make_shared>>(std::move(clauses)); - return folly::collect(symbol_processing_result_futs) - .via(&async::io_executor()) - .thenValueInline([this, handler_data, clauses_ptr, component_manager, read_options]( - std::vector&& symbol_processing_results - ) mutable { - auto [input_schemas, entity_ids, res_versioned_items, res_metadatas] = - unpack_symbol_processing_results(std::move(symbol_processing_results)); - auto pipeline_context = setup_join_pipeline_context(std::move(input_schemas), *clauses_ptr); - return schedule_remaining_iterations(std::move(entity_ids), clauses_ptr) - .thenValueInline([component_manager](std::vector&& processed_entity_ids) { - auto proc = gather_entities< - std::shared_ptr, - std::shared_ptr, - std::shared_ptr>(*component_manager, std::move(processed_entity_ids)); - return collect_segments(std::move(proc)); - }) - .thenValueInline([store = store(), handler_data, pipeline_context, read_options]( - std::vector&& slice_and_keys - ) mutable { - return prepare_output_frame( - std::move(slice_and_keys), pipeline_context, store, read_options, handler_data - ); - }) - .thenValueInline([handler_data, - pipeline_context, - res_versioned_items, - res_metadatas, - read_options](SegmentInMemory&& frame) mutable { - // Needed to force our usual backfilling behaviour when columns have been outer-joined and - // some are not present in all input symbols - ReadOptions read_options_with_dynamic_schema = read_options.clone(); - read_options_with_dynamic_schema.set_dynamic_schema(true); - return reduce_and_fix_columns( - pipeline_context, frame, read_options_with_dynamic_schema, handler_data - ) - .thenValueInline([pipeline_context, - frame, - res_versioned_items, - res_metadatas](auto&&) mutable { - return MultiSymbolReadOutput{ - std::move(*res_versioned_items), - std::move(*res_metadatas), - {frame, - timeseries_descriptor_from_pipeline_context( - pipeline_context, {}, pipeline_context->bucketize_dynamic_ - ), - {}} - }; - }); - }); - }) - .get(); + const auto original_clauses = *clauses_ptr; + + return retry_read_on_concurrent_prune(store(), version_map(), *stream_ids, *version_queries, [&] { + *clauses_ptr = original_clauses; + auto opt_index_key_futs = batch_get_versions_async(store(), version_map(), *stream_ids, *version_queries); + std::vector> symbol_processing_result_futs; + symbol_processing_result_futs.reserve(opt_index_key_futs.size()); + auto component_manager = std::make_shared(); + for (auto&& [idx, opt_index_key_fut] : folly::enumerate(opt_index_key_futs)) { + symbol_processing_result_futs.emplace_back( + std::move(opt_index_key_fut) + .thenValue([store = store(), + stream_ids, + version_queries, + read_query = read_queries.empty() ? std::make_shared() + : read_queries[idx], + idx, + read_options, + component_manager](std::optional&& opt_index_key) mutable { + auto version_info = get_version_identifier( + (*stream_ids)[idx], + (*version_queries)[idx], + read_options, + opt_index_key.has_value() + ? std::make_optional(std::move(*opt_index_key)) + : std::nullopt + ); + return read_and_process( + store, std::move(version_info), read_query, read_options, component_manager + ); + }) + ); + } + for (auto& clause : *clauses_ptr) { + clause->set_component_manager(component_manager); + } + return folly::collect(symbol_processing_result_futs) + .via(&async::io_executor()) + .thenValueInline([this, handler_data, clauses_ptr, component_manager, read_options]( + std::vector&& symbol_processing_results + ) mutable { + auto [input_schemas, entity_ids, res_versioned_items, res_metadatas] = + unpack_symbol_processing_results(std::move(symbol_processing_results)); + auto pipeline_context = setup_join_pipeline_context(std::move(input_schemas), *clauses_ptr); + return schedule_remaining_iterations(std::move(entity_ids), clauses_ptr) + .thenValueInline([component_manager](std::vector&& processed_entity_ids) { + auto proc = gather_entities< + std::shared_ptr, + std::shared_ptr, + std::shared_ptr>(*component_manager, std::move(processed_entity_ids)); + return collect_segments(std::move(proc)); + }) + .thenValueInline([store = store(), handler_data, pipeline_context, read_options]( + std::vector&& slice_and_keys + ) mutable { + return prepare_output_frame( + std::move(slice_and_keys), pipeline_context, store, read_options, handler_data + ); + }) + .thenValueInline([handler_data, + pipeline_context, + res_versioned_items, + res_metadatas, + read_options](SegmentInMemory&& frame) mutable { + // Needed to force our usual backfilling behaviour when columns have been + // outer-joined and some are not present in all input symbols + ReadOptions read_options_with_dynamic_schema = read_options.clone(); + read_options_with_dynamic_schema.set_dynamic_schema(true); + return reduce_and_fix_columns( + pipeline_context, frame, read_options_with_dynamic_schema, handler_data + ) + .thenValueInline([pipeline_context, + frame, + res_versioned_items, + res_metadatas](auto&&) mutable { + return MultiSymbolReadOutput{ + std::move(*res_versioned_items), + std::move(*res_metadatas), + {frame, + timeseries_descriptor_from_pipeline_context( + pipeline_context, {}, pipeline_context->bucketize_dynamic_ + ), + {}} + }; + }); + }); + }) + .get(); + }); } void LocalVersionedEngine::write_version_and_prune_previous( @@ -2306,6 +2461,27 @@ std::vector std::pair> { + auto version = get_version_to_read(stream_ids[idx], version_queries[idx]); + missing_data::check( + version.has_value(), + "Unable to retrieve metadata. {}@{}: version not found", + stream_ids[idx], + version_queries[idx] + ); + auto [opt_key, meta_proto] = get_metadata(std::make_optional(version->key_)).get(); + return std::make_pair(std::move(*opt_key), std::move(meta_proto)); + } + ); + // For legacy reason read_metadata_batch is not throwing if the symbol is missing TransformBatchResultsFlags flags; flags.throw_on_missing_symbol_ = false; @@ -2316,9 +2492,11 @@ std::vector, std::optional> LocalVersionedEngine::read_metadata_internal( const StreamId& stream_id, const VersionQuery& version_query ) { - auto version = get_version_to_read(stream_id, version_query); - std::optional key = version.has_value() ? std::make_optional(version->key_) : std::nullopt; - return get_metadata(std::move(key)).get(); + return retry_read_on_concurrent_prune(store(), version_map(), stream_id, version_query, [&] { + auto version = get_version_to_read(stream_id, version_query); + std::optional key = version.has_value() ? std::make_optional(version->key_) : std::nullopt; + return get_metadata(std::move(key)).get(); + }); } std::variant LocalVersionedEngine::sort_merge_internal( diff --git a/cpp/arcticdb/version/local_versioned_engine.hpp b/cpp/arcticdb/version/local_versioned_engine.hpp index ed103ec13b4..9d1c64e5904 100644 --- a/cpp/arcticdb/version/local_versioned_engine.hpp +++ b/cpp/arcticdb/version/local_versioned_engine.hpp @@ -467,6 +467,11 @@ class LocalVersionedEngine : public VersionedEngine { ); private: + ReadVersionWithNodesOutput read_one_dataframe_version( + const StreamId& stream_id, const VersionQuery& version_query, const std::shared_ptr& read_query, + const ReadOptions& read_options, std::shared_ptr handler_data + ); + void initialize(const std::shared_ptr& library); void add_to_symbol_list_on_compaction( const StreamId& stream_id, const CompactIncompleteParameters& parameters, const UpdateInfo& update_info diff --git a/cpp/arcticdb/version/test/test_version_map.cpp b/cpp/arcticdb/version/test/test_version_map.cpp index 7fa4b2be568..9469d11c54f 100644 --- a/cpp/arcticdb/version/test/test_version_map.cpp +++ b/cpp/arcticdb/version/test/test_version_map.cpp @@ -197,6 +197,37 @@ TEST(VersionMap, PingPong) { ASSERT_EQ(right_result, expected); } +TEST(VersionMap, InvalidateIfVersionRefChangedReturnsTrueWhenChangedFalseWhenUnchanged) { + auto store = std::make_shared(); + StreamId id{"test_reload_if_changed"}; + auto reader = std::make_shared(); + reader->set_validate(true); + auto writer = std::make_shared(); + writer->set_validate(true); + + ScopedConfig sc("VersionMap.ReloadInterval", 2'000'000'000'000); + + auto key1 = atom_key_builder().version_id(1).creation_ts(2).content_hash(3).start_index(4).end_index(5).build( + id, KeyType::TABLE_INDEX + ); + writer->write_version(store, key1, std::nullopt); + ASSERT_EQ(get_latest_undeleted_version(store, reader, id).value(), key1); + + // Writer advances the chain to v2; the reader's sticky cache cannot see it. + auto key2 = atom_key_builder().version_id(2).creation_ts(3).content_hash(4).start_index(5).end_index(6).build( + id, KeyType::TABLE_INDEX + ); + writer->write_version(store, key2, key1); + ASSERT_EQ(get_latest_undeleted_version(store, reader, id).value(), key1); // still stale + + // Ref has changed: method returns true and reader now sees v2. + ASSERT_TRUE(reader->invalidate_if_version_ref_changed(store, id)); + ASSERT_EQ(get_latest_undeleted_version(store, reader, id).value(), key2); + + // Ref is now current: calling again returns false (no change, no spurious retry). + ASSERT_FALSE(reader->invalidate_if_version_ref_changed(store, id)); +} + TEST(VersionMap, TestLoadsRefAndIteration) { auto store = std::make_shared(); StreamId id{"test1"}; diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index 816901e60a5..9e092116bf1 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -2189,7 +2189,9 @@ FrameAndDescriptor read_column_stats_impl(const std::shared_ptr& store, c tsd.set_stream_descriptor(segment_in_memory.descriptor()); return {SegmentInMemory(std::move(segment_in_memory)), tsd, {}}; } catch (const std::exception& e) { - storage::raise("Failed to read column stats key: {}", e.what()); + // KeyNotFoundException (not the generic E_KEY_NOT_FOUND raise) so a concurrent-prune race is + // catchable by the read-retry primitive; both map to E_KEY_NOT_FOUND for callers. + throw storage::KeyNotFoundException(fmt::format("Failed to read column stats key: {}", e.what())); } } @@ -2211,7 +2213,9 @@ ColumnStats get_column_stats_info_impl(const std::shared_ptr& store, cons util::check(unpacked, "Failed to unpack column stats header while getting column stats info"); return ColumnStats{column_stats_header, tsd}; } catch (const std::exception& e) { - storage::raise("Failed to read column stats key: {}", e.what()); + // KeyNotFoundException (not the generic E_KEY_NOT_FOUND raise) so a concurrent-prune race is + // catchable by the read-retry primitive; both map to E_KEY_NOT_FOUND for callers. + throw storage::KeyNotFoundException(fmt::format("Failed to read column stats key: {}", e.what())); } } diff --git a/cpp/arcticdb/version/version_map.hpp b/cpp/arcticdb/version/version_map.hpp index 387c602ef4e..0c7d3d7aa0e 100644 --- a/cpp/arcticdb/version/version_map.hpp +++ b/cpp/arcticdb/version/version_map.hpp @@ -216,6 +216,27 @@ class VersionMapImpl { map_.clear(); } + // Returns true if the VERSION_REF head for stream_id differs from the cached head, in which + // case the cache is reloaded via storage_reload so the retry is a cache hit. + // Returns false if the ref is unchanged — missing data is genuine, not a write race. + bool invalidate_if_version_ref_changed(std::shared_ptr store, const StreamId& stream_id) { + VersionMapEntry ref_entry(stream_id); + read_symbol_ref(store, stream_id, ref_entry); + + { + std::lock_guard lock(map_mutex_); + auto it = map_.find(stream_id); + if (it == map_.end()) + return true; + if (it->second->head_ == ref_entry.head_) + return false; + } + + static const LoadStrategy load_latest{LoadType::LATEST, LoadObjective::UNDELETED_ONLY}; + storage_reload(store, stream_id, load_latest); + return true; + } + void load_via_iteration( std::shared_ptr store, const StreamId& stream_id, std::shared_ptr& entry, bool use_index_keys_for_iteration = false diff --git a/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py b/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py index 4452e8b1b11..61ec902ca5a 100644 --- a/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py +++ b/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py @@ -2,6 +2,8 @@ import pytest from multiprocessing import Process, Queue +from tests.util.mark import RUNS_ON_GITHUB + @pytest.fixture def writer_store(lmdb_version_store_delayed_deletes_v2): @@ -13,6 +15,16 @@ def reader_store(lmdb_version_store_delayed_deletes_v2): return lmdb_version_store_delayed_deletes_v2 +@pytest.fixture +def eager_writer_store(lmdb_version_store_v2): + return lmdb_version_store_v2 + + +@pytest.fixture +def eager_reader_store(lmdb_version_store_v2): + return lmdb_version_store_v2 + + def read_repeatedly(version_store, queue: Queue): while True: try: @@ -47,3 +59,30 @@ def test_concurrent_read_write(writer_store, reader_store): reader.terminate() assert exceptions_in_reader.empty() + + +@pytest.mark.skipif( + RUNS_ON_GITHUB, reason="Non-deterministic timing; covered by deterministic unit tests in test_read_retry.py" +) +def test_concurrent_read_write_eager_prune(eager_writer_store, eager_reader_store): + """Without delayed deletes, prune_previous physically deletes superseded versions as soon as a + new version is written. A concurrent reader can resolve a version just before it is pruned out + from under it, so the read path retries: when the data/index keys it expected are gone, it + re-resolves to the current version and reads again. The reader should therefore always return + data and never surface a missing-key error, even while another process writes and eagerly + prunes in a tight loop.""" + eager_writer_store.write("sym", [1, 2, 3], prune_previous_version=True) + exceptions_in_reader = Queue() + reader = Process(target=read_repeatedly, args=(eager_reader_store, exceptions_in_reader)) + writer = Process(target=write_repeatedly, args=(eager_writer_store,)) + + try: + reader.start() + writer.start() + reader.join(5) + writer.join(0.001) + finally: + writer.terminate() + reader.terminate() + + assert exceptions_in_reader.empty() diff --git a/python/tests/unit/arcticdb/version_store/test_read_retry.py b/python/tests/unit/arcticdb/version_store/test_read_retry.py new file mode 100644 index 00000000000..ce8b788073c --- /dev/null +++ b/python/tests/unit/arcticdb/version_store/test_read_retry.py @@ -0,0 +1,423 @@ +""" +Copyright 2026 Man Group Operations Limited + +Use of this software is governed by the Business Source License 1.1 included in the file +licenses/BSL.txt. + +As of the Change Date specified in that file, in accordance with the Business Source License, +use of this software will be governed by the Apache License, version 2.0. + +Deterministic tests for the read-retry behaviour in read_dataframe_version_internal. + +A reader can resolve a version from its (stale) cached version chain just before a concurrent writer +supersedes it and eagerly prunes its keys. The read then re-resolves the symbol to the current +version and retries instead of surfacing a missing-key error. These tests reproduce that race +deterministically using two store handles to the same storage (separate version-map caches) plus a +very large reload interval that pins the reader's cache. +""" + +import pandas as pd +import pytest + +import arcticdb.toolbox.query_stats as qs +from arcticdb.exceptions import KeyNotFoundException, NoDataFoundException +from arcticdb.util.test import config_context, query_stats_operation_count + +# 2**62 nanoseconds (~146 years): effectively infinite — the cached version chain never expires. +STICKY_RELOAD_INTERVAL = 2**62 + + +def _df(value): + return pd.DataFrame({"a": [value]}) + + +def _version_ref_reads(stats): + return query_stats_operation_count(stats, "Memory_GetObject", "VERSION_REF") + + +def _version_reads(stats): + return query_stats_operation_count(stats, "Memory_GetObject", "VERSION") + + +def _index_reads(stats): + return query_stats_operation_count(stats, "Memory_GetObject", "TABLE_INDEX") + + +def test_read_retries_when_latest_version_pruned_concurrently(in_memory_store_factory): + """Reader holding a stale cache (latest = v0) reads while a writer creates v1 and eagerly prunes + v0. The reader must transparently re-resolve to v1 and return its data, not raise.""" + writer = in_memory_store_factory() + reader = in_memory_store_factory(reuse_name=True) + sym = "sym" + + with config_context("VersionMap.ReloadInterval", STICKY_RELOAD_INTERVAL): + writer.write(sym, _df(0)) + # Warm the reader's cache so it resolves latest = v0 without re-reading the ref. + assert reader.read(sym).data["a"][0] == 0 + + # Concurrent writer supersedes v0 and physically prunes it. + writer.write(sym, _df(1), prune_previous_version=True) + + # Reader's cache still points at v0, whose keys are now gone; the retry must recover. + result = reader.read(sym) + + assert result.version == 1 + assert result.data["a"][0] == 1 + + +def test_read_does_not_retry_genuinely_missing_version(in_memory_store_factory): + """A read for a version that never existed must still fail fast with NoSuchVersion rather than + being swallowed by the retry loop (which only catches missing-key errors).""" + from arcticdb.exceptions import NoSuchVersionException + + lib = in_memory_store_factory() + lib.write("sym", _df(0)) + with pytest.raises(NoSuchVersionException): + lib.read("sym", as_of=5) + + +def test_retry_only_rereads_the_raced_symbols_version_ref(in_memory_store_factory, clear_query_stats): + """The retry must be cheap and scoped: recovering a single raced symbol should re-read only that + symbol's version ref and must NOT evict the cached chains of other symbols. This is what + distinguishes the per-symbol invalidation from a global flush().""" + writer = in_memory_store_factory() + reader = in_memory_store_factory(reuse_name=True) + + with config_context("VersionMap.ReloadInterval", STICKY_RELOAD_INTERVAL): + writer.write("raced", _df(0)) + writer.write("other", _df(100)) + # Warm the reader's cache for both symbols. + assert reader.read("raced").data["a"][0] == 0 + assert reader.read("other").data["a"][0] == 100 + + # Concurrent writer supersedes and eagerly prunes "raced" only. + writer.write("raced", _df(1), prune_previous_version=True) + + qs.enable() + qs.reset_stats() + raced_result = reader.read("raced") # stale cache -> one retry + raced_stats = qs.get_query_stats() + + qs.reset_stats() + other_result = reader.read("other") # must still be a pure cache hit + other_stats = qs.get_query_stats() + qs.disable() + + assert raced_result.data["a"][0] == 1 + # One retry: two ref reads — one to detect the head change, one inside storage_reload to + # rebuild the cache entry (LOAD_LATEST shortcut, no VERSION-chain traversal). The retry's + # check_reload is then a pure cache hit — no further ref read, no VERSION reads. + # Two TABLE_INDEX reads: one failed attempt on v0's (pruned) key, one successful read of v1. + assert _version_ref_reads(raced_stats) == 2 + assert _version_reads(raced_stats) == 0 + assert _index_reads(raced_stats) == 2 + # The unrelated symbol's cached chain survived: no version-chain reads needed. + assert _version_ref_reads(other_stats) == 0 + assert _version_reads(other_stats) == 0 + + +def test_retry_reads_are_bounded_regardless_of_live_versions(in_memory_store_factory, clear_query_stats): + """The read-retry overhead must be O(1) storage reads regardless of how many live versions + exist for the symbol. We write N versions without pruning (so the version chain is N items + long), race the latest, and assert that the extra reads on retry are bounded—not + proportional to N.""" + writer = in_memory_store_factory() + reader = in_memory_store_factory(reuse_name=True) + sym = "sym" + N = 15 # Large enough to detect O(N) behaviour if present + + with config_context("VersionMap.ReloadInterval", STICKY_RELOAD_INTERVAL): + for i in range(N): + writer.write(sym, _df(i), prune_previous_version=False) + + # Warm the reader's cache so it resolves latest = v{N-1} without re-reading the ref. + assert reader.read(sym).data["a"][0] == N - 1 + + # Concurrent writer creates v_N and eagerly prunes v_{N-1}. + writer.write(sym, _df(N), prune_previous_version=True) + + qs.enable() + qs.reset_stats() + result = reader.read(sym) # stale cache -> one retry + stats = qs.get_query_stats() + qs.disable() + + assert result.data["a"][0] == N + # One retry: exactly 2 VERSION_REF reads (one head-comparison check, one inside storage_reload + # which rebuilds the cache via the LOAD_LATEST shortcut). 0 VERSION reads, 0 extra + # TABLE_INDEX reads. O(1) regardless of N. + assert _version_ref_reads(stats) == 2 + assert _version_reads(stats) == 0 + + +def test_specific_version_read_does_not_retry_when_pruned(in_memory_store_factory): + """read(as_of=0) for a version that was pruned must not silently re-resolve to a later version. + Retries are only enabled for latest-version reads; pinned queries must fail immediately.""" + writer = in_memory_store_factory() + reader = in_memory_store_factory(reuse_name=True) + + with config_context("VersionMap.ReloadInterval", STICKY_RELOAD_INTERVAL): + writer.write("sym", _df(0)) + assert reader.read("sym").data["a"][0] == 0 # warm cache so reader sees v0 in stale chain + + writer.write("sym", _df(1), prune_previous_version=True) + + # The stale cache still resolves v0, but its keys are gone. Pinned queries never retry, + # so the error propagates immediately — no wrong-version return. + with pytest.raises((NoDataFoundException, KeyNotFoundException)): + reader.read("sym", as_of=0) + + +# Note: a snapshot test analogous to test_specific_version_read_does_not_retry_when_pruned is not +# included because ArcticDB protects snapshot-referenced keys from pruning — prune_previous_version +# leaves keys intact when they are held by a snapshot, so snapshot reads always succeed and the +# "no-retry" code path is not reachable through normal API usage. + + +# ---- batch_read ---- + + +def test_read_batch_recovers_when_latest_version_pruned_concurrently(in_memory_store_factory): + """batch_read recovers the same way as read when a concurrent prune races a latest-version read.""" + writer = in_memory_store_factory() + reader = in_memory_store_factory(reuse_name=True) + sym = "sym" + + with config_context("VersionMap.ReloadInterval", STICKY_RELOAD_INTERVAL): + writer.write(sym, _df(0)) + assert reader.read(sym).data["a"][0] == 0 + + writer.write(sym, _df(1), prune_previous_version=True) + + results = reader.batch_read([sym]) + + assert results[sym].data["a"][0] == 1 + + +def test_read_batch_partial_race_recovers_raced_symbol_only(in_memory_store_factory): + """A 2-symbol batch where only one symbol raced: that symbol recovers, the other is untouched.""" + writer = in_memory_store_factory() + reader = in_memory_store_factory(reuse_name=True) + + with config_context("VersionMap.ReloadInterval", STICKY_RELOAD_INTERVAL): + writer.write("raced", _df(0)) + writer.write("stable", _df(100)) + assert reader.read("raced").data["a"][0] == 0 + assert reader.read("stable").data["a"][0] == 100 + + writer.write("raced", _df(1), prune_previous_version=True) + + results = reader.batch_read(["raced", "stable"]) + + assert results["raced"].data["a"][0] == 1 + assert results["stable"].data["a"][0] == 100 + + +def test_read_batch_retry_is_bounded(in_memory_store_factory, clear_query_stats): + """Retry overhead for batch_read must be O(1) storage reads, matching the single-symbol retry.""" + writer = in_memory_store_factory() + reader = in_memory_store_factory(reuse_name=True) + sym = "sym" + N = 15 + + with config_context("VersionMap.ReloadInterval", STICKY_RELOAD_INTERVAL): + for i in range(N): + writer.write(sym, _df(i), prune_previous_version=False) + + assert reader.read(sym).data["a"][0] == N - 1 + + writer.write(sym, _df(N), prune_previous_version=True) + + qs.enable() + qs.reset_stats() + results = reader.batch_read([sym]) + stats = qs.get_query_stats() + qs.disable() + + assert results[sym].data["a"][0] == N + assert _version_ref_reads(stats) == 2 + assert _version_reads(stats) == 0 + + +# ---- read_batch_and_join ---- + + +def test_read_batch_and_join_recovers_when_latest_version_pruned_concurrently(in_memory_store_factory): + """batch_read_and_join recovers when a concurrent prune races a latest-version read.""" + from arcticdb import QueryBuilder + + writer = in_memory_store_factory() + reader = in_memory_store_factory(reuse_name=True) + sym_a, sym_b = "sym_a", "sym_b" + + with config_context("VersionMap.ReloadInterval", STICKY_RELOAD_INTERVAL): + writer.write(sym_a, _df(0)) + writer.write(sym_b, _df(100)) + assert reader.read(sym_a).data["a"][0] == 0 + assert reader.read(sym_b).data["a"][0] == 100 + + writer.write(sym_a, _df(1), prune_previous_version=True) + + q = QueryBuilder() + q = q.concat("outer") + result = reader.batch_read_and_join([sym_a, sym_b], q) + + assert len(result.versions) == 2 + + +# ---- read_metadata / read_metadata_batch ---- + + +def test_read_metadata_recovers_when_latest_version_pruned_concurrently(in_memory_store_factory): + """read_metadata recovers when a concurrent prune races a latest-version read.""" + writer = in_memory_store_factory() + reader = in_memory_store_factory(reuse_name=True) + sym = "sym" + + with config_context("VersionMap.ReloadInterval", STICKY_RELOAD_INTERVAL): + writer.write(sym, _df(0), metadata={"v": 0}) + assert reader.read_metadata(sym) is not None + + writer.write(sym, _df(1), metadata={"v": 1}, prune_previous_version=True) + + result = reader.read_metadata(sym) + + assert result.version == 1 + + +def test_read_metadata_batch_recovers_when_latest_version_pruned_concurrently(in_memory_store_factory): + """batch_read_metadata recovers when a concurrent prune races a latest-version read.""" + writer = in_memory_store_factory() + reader = in_memory_store_factory(reuse_name=True) + sym = "sym" + + with config_context("VersionMap.ReloadInterval", STICKY_RELOAD_INTERVAL): + writer.write(sym, _df(0), metadata={"v": 0}) + assert reader.read_metadata(sym) is not None + + writer.write(sym, _df(1), metadata={"v": 1}, prune_previous_version=True) + + results = reader.batch_read_metadata([sym]) + + assert sym in results + assert results[sym].version == 1 + + +# ---- get_info / batch_get_info (get_description / get_description_batch) ---- + + +def test_get_info_recovers_when_latest_version_pruned_concurrently(in_memory_store_factory): + """get_info (backing get_description) recovers when a concurrent prune races a latest-version read.""" + writer = in_memory_store_factory() + reader = in_memory_store_factory(reuse_name=True) + sym = "sym" + + with config_context("VersionMap.ReloadInterval", STICKY_RELOAD_INTERVAL): + writer.write(sym, _df(0)) + assert reader.get_info(sym) is not None + + writer.write(sym, _df(1), prune_previous_version=True) + + info = reader.get_info(sym) + + assert info is not None + assert info["rows"] == 1 + + +def test_batch_get_info_recovers_when_latest_version_pruned_concurrently(in_memory_store_factory): + """batch_get_info (backing get_description_batch) recovers when a concurrent prune races a latest-version read.""" + writer = in_memory_store_factory() + reader = in_memory_store_factory(reuse_name=True) + sym = "sym" + + with config_context("VersionMap.ReloadInterval", STICKY_RELOAD_INTERVAL): + writer.write(sym, _df(0)) + assert reader.get_info(sym) is not None + + writer.write(sym, _df(1), prune_previous_version=True) + + results = reader.batch_get_info([sym]) + + assert len(results) == 1 + assert results[0] is not None + assert results[0]["rows"] == 1 + + +# ---- read_modify_write ---- + + +def test_read_modify_write_recovers_and_writes_target_once(in_memory_store_factory): + """read_modify_write retries the source read when its version is pruned mid-flight, and the + target version must be written exactly once — the write is outside the retry, so a retried + source read must not produce a second target version.""" + from arcticdb import QueryBuilder + + writer = in_memory_store_factory() + reader = in_memory_store_factory(reuse_name=True) + src, dst = "src", "dst" + + with config_context("VersionMap.ReloadInterval", STICKY_RELOAD_INTERVAL): + writer.write(src, _df(0)) + assert reader.read(src).data["a"][0] == 0 # warm cache so reader sees src@v0 in stale chain + + writer.write(src, _df(1), prune_previous_version=True) + + q = QueryBuilder() + q = q[q["a"] >= 0] + reader._read_modify_write(src, q, dst) + + result = reader.read(dst) + + # Recovered the live source (v1), and the target was written exactly once (version 0). + assert result.data["a"][0] == 1 + assert result.version == 0 + + +# ---- read_column_stats / get_column_stats_info ---- + + +def test_read_column_stats_recovers_when_latest_version_pruned_concurrently(in_memory_store_factory): + """read_column_stats recovers when a concurrent prune races a latest-version read. The pruned + version's column-stats key is gone, so the read must re-resolve to the live version's stats.""" + writer = in_memory_store_factory() + reader = in_memory_store_factory(reuse_name=True) + sym = "sym" + stats = {"a": {"MINMAX"}} + + with config_context("VersionMap.ReloadInterval", STICKY_RELOAD_INTERVAL): + writer.write(sym, _df(0)) + writer.create_column_stats(sym, stats) + assert reader.read_column_stats(sym) is not None # warm cache so reader sees v0 in stale chain + + # Supersede and prune v0 (deletes v0's column-stats key too), then build stats for v1. + writer.write(sym, _df(1), prune_previous_version=True) + writer.create_column_stats(sym, stats) + + table = reader.read_column_stats(sym) + + assert table is not None + + +def test_get_column_stats_info_recovers_when_latest_version_pruned_concurrently(in_memory_store_factory): + """get_column_stats_info recovers when a concurrent prune races a latest-version read.""" + writer = in_memory_store_factory() + reader = in_memory_store_factory(reuse_name=True) + sym = "sym" + stats = {"a": {"MINMAX"}} + + with config_context("VersionMap.ReloadInterval", STICKY_RELOAD_INTERVAL): + writer.write(sym, _df(0)) + writer.create_column_stats(sym, stats) + assert reader.get_column_stats_info(sym) is not None + + writer.write(sym, _df(1), prune_previous_version=True) + writer.create_column_stats(sym, stats) + + info = reader.get_column_stats_info(sym) + + assert info == stats + + +# Note: get_index_range (LocalVersionedEngine::get_index_range) is also wrapped with the retry +# primitive, but it has no Python binding and no other caller, so there is no deterministic +# Python-level test for it. It follows the same pattern as the methods covered above.