Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
432 changes: 305 additions & 127 deletions cpp/arcticdb/version/local_versioned_engine.cpp

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions cpp/arcticdb/version/local_versioned_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,11 @@ class LocalVersionedEngine : public VersionedEngine {
);

private:
ReadVersionWithNodesOutput read_one_dataframe_version(
const StreamId& stream_id, const VersionQuery& version_query, const std::shared_ptr<ReadQuery>& read_query,
const ReadOptions& read_options, std::shared_ptr<std::any> handler_data
);

void initialize(const std::shared_ptr<storage::Library>& library);
void add_to_symbol_list_on_compaction(
const StreamId& stream_id, const CompactIncompleteParameters& parameters, const UpdateInfo& update_info
Expand Down
31 changes: 31 additions & 0 deletions cpp/arcticdb/version/test/test_version_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,37 @@ TEST(VersionMap, PingPong) {
ASSERT_EQ(right_result, expected);
}

TEST(VersionMap, InvalidateIfVersionRefChangedReturnsTrueWhenChangedFalseWhenUnchanged) {
auto store = std::make_shared<InMemoryStore>();
StreamId id{"test_reload_if_changed"};
auto reader = std::make_shared<VersionMap>();
reader->set_validate(true);
auto writer = std::make_shared<VersionMap>();
writer->set_validate(true);

ScopedConfig sc("VersionMap.ReloadInterval", 2'000'000'000'000);

auto key1 = atom_key_builder().version_id(1).creation_ts(2).content_hash(3).start_index(4).end_index(5).build(
id, KeyType::TABLE_INDEX
);
writer->write_version(store, key1, std::nullopt);
ASSERT_EQ(get_latest_undeleted_version(store, reader, id).value(), key1);

// Writer advances the chain to v2; the reader's sticky cache cannot see it.
auto key2 = atom_key_builder().version_id(2).creation_ts(3).content_hash(4).start_index(5).end_index(6).build(
id, KeyType::TABLE_INDEX
);
writer->write_version(store, key2, key1);
ASSERT_EQ(get_latest_undeleted_version(store, reader, id).value(), key1); // still stale

// Ref has changed: method returns true and reader now sees v2.
ASSERT_TRUE(reader->invalidate_if_version_ref_changed(store, id));
ASSERT_EQ(get_latest_undeleted_version(store, reader, id).value(), key2);

// Ref is now current: calling again returns false (no change, no spurious retry).
ASSERT_FALSE(reader->invalidate_if_version_ref_changed(store, id));
}

TEST(VersionMap, TestLoadsRefAndIteration) {
auto store = std::make_shared<InMemoryStore>();
StreamId id{"test1"};
Expand Down
8 changes: 6 additions & 2 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2189,7 +2189,9 @@ FrameAndDescriptor read_column_stats_impl(const std::shared_ptr<Store>& store, c
tsd.set_stream_descriptor(segment_in_memory.descriptor());
return {SegmentInMemory(std::move(segment_in_memory)), tsd, {}};
} catch (const std::exception& e) {
storage::raise<ErrorCode::E_KEY_NOT_FOUND>("Failed to read column stats key: {}", e.what());
// KeyNotFoundException (not the generic E_KEY_NOT_FOUND raise) so a concurrent-prune race is
// catchable by the read-retry primitive; both map to E_KEY_NOT_FOUND for callers.
throw storage::KeyNotFoundException(fmt::format("Failed to read column stats key: {}", e.what()));
}
}

Expand All @@ -2211,7 +2213,9 @@ ColumnStats get_column_stats_info_impl(const std::shared_ptr<Store>& store, cons
util::check(unpacked, "Failed to unpack column stats header while getting column stats info");
return ColumnStats{column_stats_header, tsd};
} catch (const std::exception& e) {
storage::raise<ErrorCode::E_KEY_NOT_FOUND>("Failed to read column stats key: {}", e.what());
// KeyNotFoundException (not the generic E_KEY_NOT_FOUND raise) so a concurrent-prune race is
// catchable by the read-retry primitive; both map to E_KEY_NOT_FOUND for callers.
throw storage::KeyNotFoundException(fmt::format("Failed to read column stats key: {}", e.what()));
}
}

Expand Down
21 changes: 21 additions & 0 deletions cpp/arcticdb/version/version_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,27 @@ class VersionMapImpl {
map_.clear();
}

// Returns true if the VERSION_REF head for stream_id differs from the cached head, in which
// case the cache is reloaded via storage_reload so the retry is a cache hit.
// Returns false if the ref is unchanged — missing data is genuine, not a write race.
bool invalidate_if_version_ref_changed(std::shared_ptr<Store> store, const StreamId& stream_id) {
VersionMapEntry ref_entry(stream_id);
read_symbol_ref(store, stream_id, ref_entry);

{
std::lock_guard lock(map_mutex_);
auto it = map_.find(stream_id);
if (it == map_.end())
return true;
if (it->second->head_ == ref_entry.head_)
return false;
}

static const LoadStrategy load_latest{LoadType::LATEST, LoadObjective::UNDELETED_ONLY};
storage_reload(store, stream_id, load_latest);
return true;
}

void load_via_iteration(
std::shared_ptr<Store> store, const StreamId& stream_id, std::shared_ptr<VersionMapEntry>& entry,
bool use_index_keys_for_iteration = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import pytest
from multiprocessing import Process, Queue

from tests.util.mark import RUNS_ON_GITHUB


@pytest.fixture
def writer_store(lmdb_version_store_delayed_deletes_v2):
Expand All @@ -13,6 +15,16 @@ def reader_store(lmdb_version_store_delayed_deletes_v2):
return lmdb_version_store_delayed_deletes_v2


@pytest.fixture
def eager_writer_store(lmdb_version_store_v2):
return lmdb_version_store_v2


@pytest.fixture
def eager_reader_store(lmdb_version_store_v2):
return lmdb_version_store_v2


def read_repeatedly(version_store, queue: Queue):
while True:
try:
Expand Down Expand Up @@ -47,3 +59,30 @@ def test_concurrent_read_write(writer_store, reader_store):
reader.terminate()

assert exceptions_in_reader.empty()


@pytest.mark.skipif(
RUNS_ON_GITHUB, reason="Non-deterministic timing; covered by deterministic unit tests in test_read_retry.py"
)
def test_concurrent_read_write_eager_prune(eager_writer_store, eager_reader_store):
"""Without delayed deletes, prune_previous physically deletes superseded versions as soon as a
new version is written. A concurrent reader can resolve a version just before it is pruned out
from under it, so the read path retries: when the data/index keys it expected are gone, it
re-resolves to the current version and reads again. The reader should therefore always return
data and never surface a missing-key error, even while another process writes and eagerly
prunes in a tight loop."""
eager_writer_store.write("sym", [1, 2, 3], prune_previous_version=True)
exceptions_in_reader = Queue()
reader = Process(target=read_repeatedly, args=(eager_reader_store, exceptions_in_reader))
writer = Process(target=write_repeatedly, args=(eager_writer_store,))

try:
reader.start()
writer.start()
reader.join(5)
writer.join(0.001)
finally:
writer.terminate()
reader.terminate()

assert exceptions_in_reader.empty()

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like this test might be flaky. It doesn't seem impossible for the writer process to invalidate the reader multiple times.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, this test will not be reliable, better to use the storage failure simulator for this sort of thing

Loading
Loading