From 1a9dc09596a241dc702d7feea1171fbe742e7baf Mon Sep 17 00:00:00 2001 From: Yixin Luo <18810541851@163.com> Date: Wed, 13 May 2026 11:26:24 +0800 Subject: [PATCH] [Enhancement] Split publish-trace SST miss counters by local-cache vs remote (#73087) Signed-off-by: luohaha <18810541851@163.com> --- be/src/fs/bundle_file.cpp | 4 + be/src/fs/bundle_file.h | 1 + be/src/fs/fs_starlet.cpp | 14 + be/src/io/CMakeLists.txt | 1 + be/src/io/compressed_input_stream.cpp | 9 + be/src/io/compressed_input_stream.h | 4 + be/src/io/input_stream.cpp | 30 ++ be/src/io/input_stream.h | 35 ++ be/src/io/seekable_input_stream.cpp | 5 + be/src/io/seekable_input_stream.h | 2 + be/src/io/shared_buffered_input_stream.cpp | 5 + be/src/io/shared_buffered_input_stream.h | 2 + .../storage/lake/persistent_index_sstable.cpp | 33 ++ be/test/fs/fs_starlet_test.cpp | 134 ++++++ be/test/io/compressed_input_stream_test.cpp | 36 ++ .../lake/persistent_index_sstable_test.cpp | 393 ++++++++++++++++++ 16 files changed, 708 insertions(+) create mode 100644 be/src/io/input_stream.cpp diff --git a/be/src/fs/bundle_file.cpp b/be/src/fs/bundle_file.cpp index a39c64b0de99e9..28b859fbdf74db 100644 --- a/be/src/fs/bundle_file.cpp +++ b/be/src/fs/bundle_file.cpp @@ -143,6 +143,10 @@ StatusOr> BundleSeekableInputStream::get_ return _stream->get_numeric_statistics(); } +io::IoStatsSnapshot BundleSeekableInputStream::get_io_stats_snapshot() const { + return _stream->get_io_stats_snapshot(); +} + const std::string& BundleSeekableInputStream::filename() const { return _stream->filename(); } diff --git a/be/src/fs/bundle_file.h b/be/src/fs/bundle_file.h index b39e13a5c895bd..0c4bdc03dcfa1d 100644 --- a/be/src/fs/bundle_file.h +++ b/be/src/fs/bundle_file.h @@ -109,6 +109,7 @@ class BundleSeekableInputStream final : public io::SeekableInputStream { StatusOr read(void* data, int64_t count) override; Status touch_cache(int64_t offset, size_t length) override; StatusOr> get_numeric_statistics() override; + io::IoStatsSnapshot get_io_stats_snapshot() const override; // Override so two slices of the same physical file produce distinct keys at the same // stream-relative offset: we fold the slice base offset into the key, yielding a key diff --git a/be/src/fs/fs_starlet.cpp b/be/src/fs/fs_starlet.cpp index ab8846e9f57069..302825980ddaa0 100644 --- a/be/src/fs/fs_starlet.cpp +++ b/be/src/fs/fs_starlet.cpp @@ -217,6 +217,20 @@ class StarletInputStream : public starrocks::io::SeekableInputStream { return std::move(stats); } + io::IoStatsSnapshot get_io_stats_snapshot() const override { + auto stream_st = _file_ptr->stream(); + if (!stream_st.ok()) { + return {}; + } + const auto& s = (*stream_st)->get_io_stats(); + return { + s.bytes_read_local_disk, s.bytes_read_remote, s.bytes_write_local_disk, s.bytes_write_remote, + s.io_count_local_disk, s.io_count_remote, s.io_ns_read_local_disk, s.io_ns_read_remote, + s.io_ns_write_local_disk, s.io_ns_write_remote, s.prefetch_hit_count, s.prefetch_wait_finish_ns, + s.prefetch_pending_ns, + }; + } + private: ReadOnlyFilePtr _file_ptr; }; diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt index 9879f62a331b9a..ca4fbea87c40ee 100644 --- a/be/src/io/CMakeLists.txt +++ b/be/src/io/CMakeLists.txt @@ -22,6 +22,7 @@ add_library(IO STATIC compressed_input_stream.cpp fd_output_stream.cpp fd_input_stream.cpp + input_stream.cpp io_profiler.cpp seekable_input_stream.cpp readable.cpp diff --git a/be/src/io/compressed_input_stream.cpp b/be/src/io/compressed_input_stream.cpp index b3610ca3b3d71d..4ac9f4abd0633e 100644 --- a/be/src/io/compressed_input_stream.cpp +++ b/be/src/io/compressed_input_stream.cpp @@ -106,4 +106,13 @@ Status CompressedInputStream::skip(int64_t n) { return Status::OK(); } +// Out-of-line so gcov can attribute coverage to a concrete `.cpp` line. +IoStatsSnapshot CompressedInputStream::get_io_stats_snapshot() const { + return _source_stream->get_io_stats_snapshot(); +} + +IoStatsSnapshot CompressedSeekableInputStream::get_io_stats_snapshot() const { + return _source->get_io_stats_snapshot(); +} + } // namespace starrocks::io diff --git a/be/src/io/compressed_input_stream.h b/be/src/io/compressed_input_stream.h index ab737111be2248..eac09dfa051513 100644 --- a/be/src/io/compressed_input_stream.h +++ b/be/src/io/compressed_input_stream.h @@ -46,6 +46,8 @@ class CompressedInputStream final : public InputStream { return _source_stream->get_numeric_statistics(); } + IoStatsSnapshot get_io_stats_snapshot() const override; + private: // Used to store the compressed data read from |_source_stream|. class CompressedBuffer { @@ -94,6 +96,8 @@ class CompressedSeekableInputStream final : public SeekableInputStream { return _source->get_numeric_statistics(); } + IoStatsSnapshot get_io_stats_snapshot() const override; + Status seek(int64_t position) override { return Status::NotSupported(""); } StatusOr position() override { return Status::NotSupported(""); } StatusOr get_size() override { return Status::NotSupported(""); } diff --git a/be/src/io/input_stream.cpp b/be/src/io/input_stream.cpp new file mode 100644 index 00000000000000..136b1be454990c --- /dev/null +++ b/be/src/io/input_stream.cpp @@ -0,0 +1,30 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "io/input_stream.h" + +namespace starrocks::io { + +// Out-of-line so gcov can attribute coverage to a concrete `.cpp` line: header-only +// inline definitions get inlined at every call site and the resulting `.gcda` data +// is keyed off the call site, leaving the header line counted as 0/0. +IoStatsSnapshot InputStream::get_io_stats_snapshot() const { + return {}; +} + +IoStatsSnapshot InputStreamWrapper::get_io_stats_snapshot() const { + return _impl->get_io_stats_snapshot(); +} + +} // namespace starrocks::io diff --git a/be/src/io/input_stream.h b/be/src/io/input_stream.h index 957a49927e6161..6a7ba51ac3a9d4 100644 --- a/be/src/io/input_stream.h +++ b/be/src/io/input_stream.h @@ -24,6 +24,32 @@ namespace starrocks::io { class NumericStatistics; +// Plain-old-data snapshot of cumulative read/write IO counters. Cheap drop-in alternative +// to `get_numeric_statistics()` for hot paths that only need a fixed set of counters +// (e.g. publish-time tracing): a single struct copy with no heap allocation, no string +// construction, and no vector growth. Streams that do not expose IO breakdown leave every +// field at zero. Adding a counter only requires extending this struct and the relevant +// concrete-stream override; the abstract interface stays the same. +struct IoStatsSnapshot { + // bytes + int64_t bytes_read_local_disk = 0; + int64_t bytes_read_remote = 0; + int64_t bytes_write_local_disk = 0; + int64_t bytes_write_remote = 0; + // op counts + int64_t io_count_local_disk = 0; + int64_t io_count_remote = 0; + // time (ns) + int64_t io_ns_read_local_disk = 0; + int64_t io_ns_read_remote = 0; + int64_t io_ns_write_local_disk = 0; + int64_t io_ns_write_remote = 0; + // prefetch + int64_t prefetch_hit_count = 0; + int64_t prefetch_wait_finish_ns = 0; + int64_t prefetch_pending_ns = 0; +}; + // InputStream is the superclass of all classes representing an input stream of bytes. class InputStream : public Readable { public: @@ -50,6 +76,13 @@ class InputStream : public Readable { // If the InputStream implementation doesn't support statistics, a null pointer or // an empty statistics is returned. virtual StatusOr> get_numeric_statistics() { return nullptr; } + + // Cheap fixed-shape variant of `get_numeric_statistics()`. Default returns all zeros. + // Prefer this over `get_numeric_statistics()` on hot paths (publish, multi_get, …) — + // it avoids the heap-allocated NumericStatistics object and the 11 std::string keys + // built by adapter streams. Out-of-line so gcov can credit a `.cpp` line; see + // `input_stream.cpp` for the definition. + virtual IoStatsSnapshot get_io_stats_snapshot() const; }; class InputStreamWrapper : public InputStream { @@ -81,6 +114,8 @@ class InputStreamWrapper : public InputStream { return _impl->get_numeric_statistics(); } + IoStatsSnapshot get_io_stats_snapshot() const override; + private: InputStream* _impl; Ownership _ownership; diff --git a/be/src/io/seekable_input_stream.cpp b/be/src/io/seekable_input_stream.cpp index 9e23191577b6f2..b62fe98fcb6642 100644 --- a/be/src/io/seekable_input_stream.cpp +++ b/be/src/io/seekable_input_stream.cpp @@ -20,6 +20,11 @@ namespace starrocks::io { +// Out-of-line so gcov can attribute coverage to a concrete `.cpp` line. +IoStatsSnapshot SeekableInputStreamWrapper::get_io_stats_snapshot() const { + return _impl->get_io_stats_snapshot(); +} + StatusOr SeekableInputStream::read_at(int64_t offset, void* data, int64_t count) { RETURN_IF_ERROR(seek(offset)); return read(data, count); diff --git a/be/src/io/seekable_input_stream.h b/be/src/io/seekable_input_stream.h index bb0aacd19a9358..1e681446001678 100644 --- a/be/src/io/seekable_input_stream.h +++ b/be/src/io/seekable_input_stream.h @@ -130,6 +130,8 @@ class SeekableInputStreamWrapper : public SeekableInputStream { return _impl->get_numeric_statistics(); } + IoStatsSnapshot get_io_stats_snapshot() const override; + StatusOr position() override { return _impl->position(); } StatusOr read_at(int64_t offset, void* out, int64_t count) override { diff --git a/be/src/io/shared_buffered_input_stream.cpp b/be/src/io/shared_buffered_input_stream.cpp index c3c37545cbc4e9..a59e6e5dc5287f 100644 --- a/be/src/io/shared_buffered_input_stream.cpp +++ b/be/src/io/shared_buffered_input_stream.cpp @@ -305,4 +305,9 @@ int64_t SharedBufferedInputStream::current_range_ref_sum() const { return ref; } +// Out-of-line so gcov can attribute coverage to a concrete `.cpp` line. +IoStatsSnapshot SharedBufferedInputStream::get_io_stats_snapshot() const { + return _stream->get_io_stats_snapshot(); +} + } // namespace starrocks::io diff --git a/be/src/io/shared_buffered_input_stream.h b/be/src/io/shared_buffered_input_stream.h index 30cfa718d4824a..91984ab52ad1de 100644 --- a/be/src/io/shared_buffered_input_stream.h +++ b/be/src/io/shared_buffered_input_stream.h @@ -77,6 +77,8 @@ class SharedBufferedInputStream : public SeekableInputStream { return _stream->get_numeric_statistics(); } + IoStatsSnapshot get_io_stats_snapshot() const override; + Status set_io_ranges(const std::vector& ranges, bool coalesce_lazy_column = true); void release_to_offset(int64_t offset); void release(); diff --git a/be/src/storage/lake/persistent_index_sstable.cpp b/be/src/storage/lake/persistent_index_sstable.cpp index 75a36003299feb..d9d60bd65cf5e2 100644 --- a/be/src/storage/lake/persistent_index_sstable.cpp +++ b/be/src/storage/lake/persistent_index_sstable.cpp @@ -16,9 +16,12 @@ #include // NOLINT +#include + #include "fs/fs.h" #include "fs/key_cache.h" #include "gen_cpp/types.pb.h" +#include "io/input_stream.h" #include "storage/lake/lake_delvec_loader.h" #include "storage/lake/utils.h" #include "storage/sstable/table_builder.h" @@ -30,6 +33,17 @@ namespace starrocks::lake { namespace { +// Cheap drop-in for `get_numeric_statistics()` on the publish hot path: a single +// virtual call returning a 13×int64 struct, no heap alloc / no strings / no vector. +// Plain POSIX (shared-nothing UT) returns all zero via the base default. Callers +// always pass non-null `_rf.get()` (set in `init()`) or non-null fresh `rf.get()` +// (from `ASSIGN_OR_RETURN`), so the only guard we need is against the rare case +// where the underlying file's stream pointer comes back null. +io::IoStatsSnapshot take_sstable_io_snapshot(RandomAccessFile* rf) { + auto stream = rf->stream(); + return stream ? stream->get_io_stats_snapshot() : io::IoStatsSnapshot{}; +} + Status drop_corrupted_sstable_cache(const std::string& path) { #if defined(USE_STAROS) && !defined(BUILD_FORMAT_LIB) if (!config::lake_clear_corrupted_cache_data) { @@ -157,6 +171,11 @@ Status PersistentIndexSstable::multi_get(const Slice* keys, const KeyIndexSet& k ASSIGN_OR_RETURN(rf, fs::new_random_access_file(opts, _rf->filename())); } options.file = rf.get(); + // When parallel execution opens a fresh `rf`, its IO counters start at 0 so the delta below + // equals the absolute IO done by this multi_get. Otherwise `_rf` is reused across calls and + // we measure the delta against its running totals. + RandomAccessFile* active_rf = (rf != nullptr) ? rf.get() : _rf.get(); + io::IoStatsSnapshot io_snap_before = take_sstable_io_snapshot(active_rf); // Currently, there is no need to set predicate for MultiGet of persistent index sstable. Because predicate // only used for sstable compaction to filter out some keys for tablet split purpose and such keys can not // be read by the persistent index by designed. So even we provide a predicate, all keys read by multi_get @@ -178,9 +197,23 @@ Status PersistentIndexSstable::multi_get(const Slice* keys, const KeyIndexSet& k return multiget_st; } auto end_ts = butil::gettimeofday_us(); + io::IoStatsSnapshot io_snap_after = take_sstable_io_snapshot(active_rf); TRACE_COUNTER_INCREMENT("multi_get_us", end_ts - start_ts); TRACE_COUNTER_INCREMENT("read_block_hit_cache_cnt", stat.block_cnt_from_cache); TRACE_COUNTER_INCREMENT("read_block_miss_cache_cnt", stat.block_cnt_from_file); + // Break down the misses into reads served by the local data cache vs. reads that went out to + // the remote object store (S3/OSS/etc.). Deltas are clamped at 0 because cumulative counters + // should never decrease, but we guard against the file being swapped underneath us in retries. + TRACE_COUNTER_INCREMENT( + "sstable_io_local_disk_bytes", + std::max(0, io_snap_after.bytes_read_local_disk - io_snap_before.bytes_read_local_disk)); + TRACE_COUNTER_INCREMENT("sstable_io_remote_bytes", + std::max(0, io_snap_after.bytes_read_remote - io_snap_before.bytes_read_remote)); + TRACE_COUNTER_INCREMENT( + "sstable_io_count_local_disk", + std::max(0, io_snap_after.io_count_local_disk - io_snap_before.io_count_local_disk)); + TRACE_COUNTER_INCREMENT("sstable_io_count_remote", + std::max(0, io_snap_after.io_count_remote - io_snap_before.io_count_remote)); size_t i = 0; for (auto& key_index : key_indexes) { // Index_value_with_vers is empty means key is not found in sst. diff --git a/be/test/fs/fs_starlet_test.cpp b/be/test/fs/fs_starlet_test.cpp index 33d37e4cedb300..39fe1242ef281c 100644 --- a/be/test/fs/fs_starlet_test.cpp +++ b/be/test/fs/fs_starlet_test.cpp @@ -178,6 +178,14 @@ TEST_P(StarletFileSystemTest, test_write_and_read) { ASSIGN_OR_ABORT(auto stats2, rf->get_numeric_statistics()); EXPECT_EQ((*stats2).size(), 11); EXPECT_EQ("hello world!", std::string_view(buf, nr)); + // Exercise the lightweight `get_io_stats_snapshot()` path used by the publish + // trace. After the read above the snapshot must reflect non-zero byte/op + // counters on either the local-disk (cache-hit) or remote (cache-miss) side, + // and the underlying StarletInputStream override must be reached via the + // SeekableInputStreamWrapper forward. + auto io_snap = rf->get_io_stats_snapshot(); + EXPECT_GT(io_snap.bytes_read_local_disk + io_snap.bytes_read_remote, 0); + EXPECT_GT(io_snap.io_count_local_disk + io_snap.io_count_remote, 0); ASSIGN_OR_ABORT(nr, rf->read_at(3, buf, sizeof(buf))); EXPECT_OK(rf->touch_cache(0 /* offset */, sizeof("hello world!"))); @@ -554,6 +562,132 @@ class NewFsStarletTest : public ::testing::Test { } }; +// Minimal mock InputStream whose only useful operation is `get_io_stats()` — +// returns a pre-populated `IOStats` with distinct sentinel values per field, so +// the corresponding fields surfacing through `StarletInputStream::get_io_stats_snapshot()` +// can be matched 1:1. +class SnapshotMockInputStream : public staros::starlet::fslib::InputStream { +public: + SnapshotMockInputStream() { + get_mutable_io_stats().bytes_read_local_disk = 11; + get_mutable_io_stats().bytes_read_remote = 12; + get_mutable_io_stats().bytes_write_local_disk = 13; + get_mutable_io_stats().bytes_write_remote = 14; + get_mutable_io_stats().io_count_local_disk = 21; + get_mutable_io_stats().io_count_remote = 22; + get_mutable_io_stats().io_ns_read_local_disk = 31; + get_mutable_io_stats().io_ns_read_remote = 32; + get_mutable_io_stats().io_ns_write_local_disk = 33; + get_mutable_io_stats().io_ns_write_remote = 34; + get_mutable_io_stats().prefetch_hit_count = 41; + get_mutable_io_stats().prefetch_wait_finish_ns = 42; + get_mutable_io_stats().prefetch_pending_ns = 43; + } + + absl::StatusOr seek(int64_t /*offset*/, Anchor /*anchor*/) override { return 0; } + absl::StatusOr tell() override { return 0; } + absl::StatusOr size() override { return 0; } + absl::Status close() override { return absl::OkStatus(); } + absl::StatusOr read(void* /*data*/, size_t /*length*/) override { return 0; } +}; + +class SnapshotMockReadOnlyFile : public staros::starlet::fslib::ReadOnlyFile { +public: + explicit SnapshotMockReadOnlyFile(bool stream_fails = false) + : _stream(std::make_unique()), + _name("mock_io_stats_file"), + _stream_fails(stream_fails) {} + + const std::string& name() override { return _name; } + absl::StatusOr size() override { return 0; } + absl::StatusOr get_meta(std::string_view /*key*/) override { + return absl::UnimplementedError("get_meta"); + } + absl::Status set_meta(std::string_view /*key*/, std::string_view /*value*/) override { + return absl::UnimplementedError("set_meta"); + } + absl::Status remove_meta(std::string_view /*key*/) override { return absl::UnimplementedError("remove_meta"); } + absl::Status close() override { return absl::OkStatus(); } + absl::StatusOr stream() override { + if (_stream_fails) { + return absl::InternalError("stream() failed (mock)"); + } + return _stream.get(); + } + +private: + std::unique_ptr _stream; + std::string _name; + bool _stream_fails; +}; + +class SnapshotMockFileSystem : public MockStarletFileSystem { +public: + explicit SnapshotMockFileSystem(bool stream_fails = false) : _stream_fails(stream_fails) {} + + absl::StatusOr> open( + std::string_view /*path*/, const staros::starlet::fslib::ReadOptions& /*opts*/) override { + return std::unique_ptr(new SnapshotMockReadOnlyFile(_stream_fails)); + } + +private: + bool _stream_fails; +}; + +// Exercises StarletInputStream::get_io_stats_snapshot() without needing real S3 +// credentials. The trace-driven publish path on shared-data clusters calls this +// method through the RandomAccessFile / SeekableInputStreamWrapper forward, so +// regressions here would silently zero out the local-vs-remote breakdown on +// production shared-data builds. +TEST_F(NewFsStarletTest, test_starlet_input_stream_get_io_stats_snapshot) { + auto mock_fs = std::make_shared(); + int64_t test_shard_id = 99999; + SyncPoint::GetInstance()->SetCallBack("new_fs_starlet::get_shard_filesystem", [&](void* arg) { + auto* fs_st = static_cast>*>(arg); + *fs_st = mock_fs; + }); + + auto fs = new_fs_starlet(test_shard_id, false); + ASSIGN_OR_ABORT(auto rf, fs->new_random_access_file(fmt::format("staros://{}/anyfile", test_shard_id))); + auto snap = rf->get_io_stats_snapshot(); + + EXPECT_EQ(11, snap.bytes_read_local_disk); + EXPECT_EQ(12, snap.bytes_read_remote); + EXPECT_EQ(13, snap.bytes_write_local_disk); + EXPECT_EQ(14, snap.bytes_write_remote); + EXPECT_EQ(21, snap.io_count_local_disk); + EXPECT_EQ(22, snap.io_count_remote); + EXPECT_EQ(31, snap.io_ns_read_local_disk); + EXPECT_EQ(32, snap.io_ns_read_remote); + EXPECT_EQ(33, snap.io_ns_write_local_disk); + EXPECT_EQ(34, snap.io_ns_write_remote); + EXPECT_EQ(41, snap.prefetch_hit_count); + EXPECT_EQ(42, snap.prefetch_wait_finish_ns); + EXPECT_EQ(43, snap.prefetch_pending_ns); +} + +// Covers the `stream_st not ok` early-return branch in StarletInputStream:: +// get_io_stats_snapshot(). With the mock fail flag set, ReadOnlyFile::stream() +// returns InternalError and the override must degrade gracefully to all-zero +// instead of dereferencing the failed StatusOr. +TEST_F(NewFsStarletTest, test_starlet_input_stream_get_io_stats_snapshot_stream_failure) { + auto mock_fs = std::make_shared(/*stream_fails=*/true); + int64_t test_shard_id = 99998; + SyncPoint::GetInstance()->SetCallBack("new_fs_starlet::get_shard_filesystem", [&](void* arg) { + auto* fs_st = static_cast>*>(arg); + *fs_st = mock_fs; + }); + + auto fs = new_fs_starlet(test_shard_id, false); + ASSIGN_OR_ABORT(auto rf, fs->new_random_access_file(fmt::format("staros://{}/anyfile", test_shard_id))); + auto snap = rf->get_io_stats_snapshot(); + + EXPECT_EQ(0, snap.bytes_read_local_disk); + EXPECT_EQ(0, snap.bytes_read_remote); + EXPECT_EQ(0, snap.io_count_local_disk); + EXPECT_EQ(0, snap.io_count_remote); +} + TEST_F(NewFsStarletTest, test_new_fs_starlet_with_s3_raw_path_mode_true) { auto mock_fs = std::make_shared(); int64_t test_shard_id = 88888; diff --git a/be/test/io/compressed_input_stream_test.cpp b/be/test/io/compressed_input_stream_test.cpp index 090a951c93d03f..eeeb11413da0be 100644 --- a/be/test/io/compressed_input_stream_test.cpp +++ b/be/test/io/compressed_input_stream_test.cpp @@ -276,4 +276,40 @@ TEST_F(CompressedInputStreamTest, test_Snappy1) { } } +namespace { +// Minimal InputStream whose get_io_stats_snapshot() returns a recognizable +// sentinel byte count, used by the forwarding test below. +class SentinelInputStream : public InputStream { +public: + static constexpr int64_t kSentinel = 0x1A2B3C4D; + + StatusOr read(void* /*data*/, int64_t /*count*/) override { return 0; } + Status read_fully(void* /*data*/, int64_t /*count*/) override { return Status::OK(); } + Status skip(int64_t /*count*/) override { return Status::OK(); } + + IoStatsSnapshot get_io_stats_snapshot() const override { + IoStatsSnapshot snap; + snap.bytes_read_local_disk = kSentinel; + return snap; + } +}; +} // namespace + +// CompressedInputStream and CompressedSeekableInputStream both implement +// get_io_stats_snapshot() as a single-line forward to the inner stream. Anyone +// removing the override would silently downgrade the publish-trace counters +// (which call get_io_stats_snapshot through whatever wrapper chain is in play) +// to all-zero on compressed streams. Verify the sentinel passed via the inner +// stream survives both wrappers. +TEST_F(CompressedInputStreamTest, test_io_stats_snapshot_forwarding) { + auto sentinel_inner = std::make_shared(); + + CompressedInputStream cis(sentinel_inner, LZ4F_decompressor()); + EXPECT_EQ(SentinelInputStream::kSentinel, cis.get_io_stats_snapshot().bytes_read_local_disk); + + auto cis_shared = std::make_shared(sentinel_inner, LZ4F_decompressor()); + CompressedSeekableInputStream csis(cis_shared); + EXPECT_EQ(SentinelInputStream::kSentinel, csis.get_io_stats_snapshot().bytes_read_local_disk); +} + } // namespace starrocks::io diff --git a/be/test/storage/lake/persistent_index_sstable_test.cpp b/be/test/storage/lake/persistent_index_sstable_test.cpp index 6b9c887279703d..f759b1d3c2ad73 100644 --- a/be/test/storage/lake/persistent_index_sstable_test.cpp +++ b/be/test/storage/lake/persistent_index_sstable_test.cpp @@ -17,11 +17,17 @@ #include #include +#include #include +#include #include "common/config.h" +#include "fs/bundle_file.h" #include "fs/fs.h" #include "fs/fs_util.h" +#include "io/input_stream.h" +#include "io/seekable_input_stream.h" +#include "io/shared_buffered_input_stream.h" #include "storage/lake/fixed_location_provider.h" #include "storage/lake/join_path.h" #include "storage/lake/location_provider.h" @@ -35,9 +41,12 @@ #include "storage/sstable/table_builder.h" #include "testutil/assert.h" #include "testutil/sync_point.h" +#include "util/defer_op.h" #include "util/lru_cache.h" #include "util/phmap/btree.h" #include "util/starrocks_metrics.h" +#include "util/trace.h" +#include "util/trace_metrics.h" namespace starrocks::lake { @@ -921,4 +930,388 @@ TEST_F(PersistentIndexSstableTest, test_multi_get_preserves_tombstones_with_shar } } +namespace { + +// Wraps an underlying SeekableInputStream and reports synthetic IO statistics so a +// shared-nothing UT can drive the local-disk vs remote breakdown that production +// gets from starlet's CacheFs. Every successful read_at_fully contributes its byte +// count to one bucket, selected by the mode passed at construction. +class FakeStatsInputStream : public io::SeekableInputStreamWrapper { +public: + enum Mode { kAllLocal, kAllRemote }; + + FakeStatsInputStream(std::shared_ptr inner, Mode mode) + : io::SeekableInputStreamWrapper(inner.get(), kDontTakeOwnership), _inner(std::move(inner)), _mode(mode) {} + + Status read_at_fully(int64_t offset, void* out, int64_t count) override { + RETURN_IF_ERROR(_inner->read_at_fully(offset, out, count)); + _bytes_read += count; + ++_io_count; + return Status::OK(); + } + + StatusOr read(void* data, int64_t count) override { + auto res = _inner->read(data, count); + if (res.ok()) { + _bytes_read += *res; + ++_io_count; + } + return res; + } + + io::IoStatsSnapshot get_io_stats_snapshot() const override { + io::IoStatsSnapshot snap; + if (_mode == kAllRemote) { + snap.bytes_read_remote = _bytes_read; + snap.io_count_remote = _io_count; + } else { + snap.bytes_read_local_disk = _bytes_read; + snap.io_count_local_disk = _io_count; + } + return snap; + } + +private: + std::shared_ptr _inner; + Mode _mode; + int64_t _bytes_read{0}; + int64_t _io_count{0}; +}; + +// Simulates the rare `_rf`-swap case where the second snapshot sees smaller counters +// than the first (e.g., when retry-after-corruption replaces the underlying file). +// First snapshot call returns inflated values; every subsequent call returns all zero. +// `multi_get` must clamp the resulting negative delta to zero before incrementing the +// trace metric — otherwise the metric would record a negative number. +class SwappingStatsInputStream : public io::SeekableInputStreamWrapper { +public: + static constexpr int64_t kInflatedBefore = 1 << 20; + + explicit SwappingStatsInputStream(std::shared_ptr inner) + : io::SeekableInputStreamWrapper(inner.get(), kDontTakeOwnership), _inner(std::move(inner)) {} + + Status read_at_fully(int64_t offset, void* out, int64_t count) override { + return _inner->read_at_fully(offset, out, count); + } + StatusOr read(void* data, int64_t count) override { return _inner->read(data, count); } + + io::IoStatsSnapshot get_io_stats_snapshot() const override { + io::IoStatsSnapshot snap; + if (_call_count++ == 0) { + snap.bytes_read_local_disk = kInflatedBefore; + snap.bytes_read_remote = kInflatedBefore; + snap.io_count_local_disk = kInflatedBefore; + snap.io_count_remote = kInflatedBefore; + } + return snap; + } + +private: + std::shared_ptr _inner; + mutable int _call_count = 0; +}; + +// Trace stores counters keyed by `const char*` (pointer comparison), and the +// literal we pass here may not share an address with the one inside +// persistent_index_sstable.cpp across translation units. Match by string value +// instead so the test is robust against the compiler's string pooling decisions. +int64_t find_trace_metric(const std::map& metrics, std::string_view name) { + for (const auto& [k, v] : metrics) { + if (k != nullptr && name == k) { + return v; + } + } + return 0; +} + +void run_multi_get_io_breakdown_case(const std::string& test_dir, const std::string& filename, + FakeStatsInputStream::Mode mode) { + // Force the non-parallel `multi_get` code path so the read goes through the wrapped `_rf` + // we inject below. With parallel execution on (the default), multi_get opens a fresh + // RandomAccessFile via `fs::new_random_access_file` and bypasses our fake entirely; that + // production-only branch exercises the same snapshot helper against the real starlet + // stream and is out of scope for a shared-nothing UT. + bool saved_parallel = config::enable_pk_index_parallel_execution; + config::enable_pk_index_parallel_execution = false; + DeferOp restore_parallel([&] { config::enable_pk_index_parallel_execution = saved_parallel; }); + constexpr int kN = 256; // big enough that MultiGet actually reads data blocks from the file + ASSIGN_OR_ABORT(auto wf, fs::new_writable_file(lake::join_path(test_dir, filename))); + phmap::btree_map> map; + for (int i = 0; i < kN; ++i) { + map.emplace(fmt::format("io_key_{:08d}", i), std::make_pair(int64_t(1), IndexValue(i))); + } + uint64_t filesize = 0; + PersistentIndexSstableRangePB range_pb; + ASSERT_OK(PersistentIndexSstable::build_sstable(map, wf.get(), &filesize, &range_pb)); + ASSERT_OK(wf->close()); + + ASSIGN_OR_ABORT(auto read_file, fs::new_random_access_file(lake::join_path(test_dir, filename))); + auto inner_stream = read_file->stream(); + auto fake_stream = std::make_shared(inner_stream, mode); + auto wrapped_file = std::make_unique(fake_stream, read_file->filename()); + + auto sst = std::make_unique(); + std::unique_ptr cache(new_lru_cache(1024)); + PersistentIndexSstablePB sstable_pb; + sstable_pb.set_filename(filename); + sstable_pb.set_filesize(filesize); + sstable_pb.mutable_range()->CopyFrom(range_pb); + sstable_pb.mutable_fileset_id()->CopyFrom(UniqueId::gen_uid().to_proto()); + ASSERT_OK(sst->init(std::move(wrapped_file), sstable_pb, cache.get())); + + std::vector key_str(kN); + std::vector keys(kN); + std::vector values(kN, IndexValue(NullIndexValue)); + KeyIndexSet key_indexes; + KeyIndexSet found; + for (int i = 0; i < kN; ++i) { + key_str[i] = fmt::format("io_key_{:08d}", i); + keys[i] = Slice(key_str[i]); + key_indexes.insert(i); + } + + scoped_refptr trace(new Trace); + { + ADOPT_TRACE(trace.get()); + ASSERT_OK(sst->multi_get(keys.data(), key_indexes, -1, values.data(), &found)); + } + ASSERT_EQ(found.size(), static_cast(kN)); + + auto metrics = trace->metrics()->Get(); + int64_t miss_cnt = find_trace_metric(metrics, "read_block_miss_cache_cnt"); + int64_t local_bytes = find_trace_metric(metrics, "sstable_io_local_disk_bytes"); + int64_t remote_bytes = find_trace_metric(metrics, "sstable_io_remote_bytes"); + int64_t local_count = find_trace_metric(metrics, "sstable_io_count_local_disk"); + int64_t remote_count = find_trace_metric(metrics, "sstable_io_count_remote"); + + // The sstable layer must have missed its in-memory block cache at least once for + // the new counters to mean anything; otherwise the case isn't exercising the path. + ASSERT_GT(miss_cnt, 0) << "MultiGet didn't trigger any sstable block file reads"; + + if (mode == FakeStatsInputStream::kAllRemote) { + EXPECT_EQ(0, local_bytes); + EXPECT_EQ(0, local_count); + EXPECT_GT(remote_bytes, 0); + EXPECT_GT(remote_count, 0); + } else { + EXPECT_GT(local_bytes, 0); + EXPECT_GT(local_count, 0); + EXPECT_EQ(0, remote_bytes); + EXPECT_EQ(0, remote_count); + } +} + +} // namespace + +TEST_F(PersistentIndexSstableTest, test_multi_get_io_breakdown_local_disk) { + run_multi_get_io_breakdown_case(kTestDir, "io_breakdown_local.sst", FakeStatsInputStream::kAllLocal); +} + +TEST_F(PersistentIndexSstableTest, test_multi_get_io_breakdown_remote) { + run_multi_get_io_breakdown_case(kTestDir, "io_breakdown_remote.sst", FakeStatsInputStream::kAllRemote); +} + +// When the underlying stream exposes no NumericStatistics (the common shared-nothing +// case), multi_get must still emit the new counters at zero rather than crash or skip +// them entirely. This guards the nullptr-stats branch of take_index_sstable_io_snapshot. +TEST_F(PersistentIndexSstableTest, test_multi_get_io_breakdown_no_stats) { + constexpr int kN = 16; + const std::string filename = "io_breakdown_no_stats.sst"; + ASSIGN_OR_ABORT(auto wf, fs::new_writable_file(lake::join_path(kTestDir, filename))); + phmap::btree_map> map; + for (int i = 0; i < kN; ++i) { + map.emplace(fmt::format("ns_key_{:08d}", i), std::make_pair(int64_t(1), IndexValue(i))); + } + uint64_t filesize = 0; + PersistentIndexSstableRangePB range_pb; + ASSERT_OK(PersistentIndexSstable::build_sstable(map, wf.get(), &filesize, &range_pb)); + ASSERT_OK(wf->close()); + + auto sst = std::make_unique(); + ASSIGN_OR_ABORT(auto read_file, fs::new_random_access_file(lake::join_path(kTestDir, filename))); + std::unique_ptr cache(new_lru_cache(1024)); + PersistentIndexSstablePB sstable_pb; + sstable_pb.set_filename(filename); + sstable_pb.set_filesize(filesize); + sstable_pb.mutable_range()->CopyFrom(range_pb); + sstable_pb.mutable_fileset_id()->CopyFrom(UniqueId::gen_uid().to_proto()); + ASSERT_OK(sst->init(std::move(read_file), sstable_pb, cache.get())); + + std::vector key_str(kN); + std::vector keys(kN); + std::vector values(kN, IndexValue(NullIndexValue)); + KeyIndexSet key_indexes; + KeyIndexSet found; + for (int i = 0; i < kN; ++i) { + key_str[i] = fmt::format("ns_key_{:08d}", i); + keys[i] = Slice(key_str[i]); + key_indexes.insert(i); + } + + scoped_refptr trace(new Trace); + { + ADOPT_TRACE(trace.get()); + ASSERT_OK(sst->multi_get(keys.data(), key_indexes, -1, values.data(), &found)); + } + ASSERT_EQ(found.size(), static_cast(kN)); + + auto metrics = trace->metrics()->Get(); + EXPECT_EQ(0, find_trace_metric(metrics, "sstable_io_local_disk_bytes")); + EXPECT_EQ(0, find_trace_metric(metrics, "sstable_io_remote_bytes")); + EXPECT_EQ(0, find_trace_metric(metrics, "sstable_io_count_local_disk")); + EXPECT_EQ(0, find_trace_metric(metrics, "sstable_io_count_remote")); +} + +// Schema contract: every field of IoStatsSnapshot must zero-initialize so the base +// `InputStream::get_io_stats_snapshot()` default (`return {};`) yields an all-zero +// snapshot and the trace counters degrade cleanly on streams that don't expose IO +// breakdown. Loud, focused guard against someone later adding a field without a +// default — TraceMetrics::Increment is a plain int64_t add, an uninitialized field +// would silently leak whatever garbage was on the stack into the trace. +TEST_F(PersistentIndexSstableTest, test_io_stats_snapshot_default_zeroed) { + io::IoStatsSnapshot snap{}; + EXPECT_EQ(0, snap.bytes_read_local_disk); + EXPECT_EQ(0, snap.bytes_read_remote); + EXPECT_EQ(0, snap.bytes_write_local_disk); + EXPECT_EQ(0, snap.bytes_write_remote); + EXPECT_EQ(0, snap.io_count_local_disk); + EXPECT_EQ(0, snap.io_count_remote); + EXPECT_EQ(0, snap.io_ns_read_local_disk); + EXPECT_EQ(0, snap.io_ns_read_remote); + EXPECT_EQ(0, snap.io_ns_write_local_disk); + EXPECT_EQ(0, snap.io_ns_write_remote); + EXPECT_EQ(0, snap.prefetch_hit_count); + EXPECT_EQ(0, snap.prefetch_wait_finish_ns); + EXPECT_EQ(0, snap.prefetch_pending_ns); +} + +// `multi_get`'s retry-after-corruption path can replace the underlying file mid-call. +// When that happens the after-snapshot reads a fresh, all-zero counter set while +// before-snapshot still holds the prior file's running totals — the naive delta is +// negative. SwappingStatsInputStream simulates this by returning inflated values on +// the first snapshot call and zero on subsequent calls. The trace metrics must stay +// non-negative; without the `std::max(0, …)` clamp they would record the +// negative delta directly via TraceMetrics::Increment (plain int64 add, no floor). +TEST_F(PersistentIndexSstableTest, test_multi_get_io_breakdown_clamps_negative_delta) { + bool saved_parallel = config::enable_pk_index_parallel_execution; + config::enable_pk_index_parallel_execution = false; + DeferOp restore_parallel([&] { config::enable_pk_index_parallel_execution = saved_parallel; }); + + constexpr int kN = 16; + const std::string filename = "io_breakdown_clamp.sst"; + ASSIGN_OR_ABORT(auto wf, fs::new_writable_file(lake::join_path(kTestDir, filename))); + phmap::btree_map> map; + for (int i = 0; i < kN; ++i) { + map.emplace(fmt::format("clamp_key_{:08d}", i), std::make_pair(int64_t(1), IndexValue(i))); + } + uint64_t filesize = 0; + PersistentIndexSstableRangePB range_pb; + ASSERT_OK(PersistentIndexSstable::build_sstable(map, wf.get(), &filesize, &range_pb)); + ASSERT_OK(wf->close()); + + ASSIGN_OR_ABORT(auto read_file, fs::new_random_access_file(lake::join_path(kTestDir, filename))); + auto fake_stream = std::make_shared(read_file->stream()); + auto wrapped_file = std::make_unique(fake_stream, read_file->filename()); + + auto sst = std::make_unique(); + std::unique_ptr cache(new_lru_cache(1024)); + PersistentIndexSstablePB sstable_pb; + sstable_pb.set_filename(filename); + sstable_pb.set_filesize(filesize); + sstable_pb.mutable_range()->CopyFrom(range_pb); + sstable_pb.mutable_fileset_id()->CopyFrom(UniqueId::gen_uid().to_proto()); + ASSERT_OK(sst->init(std::move(wrapped_file), sstable_pb, cache.get())); + + std::vector key_str(kN); + std::vector keys(kN); + std::vector values(kN, IndexValue(NullIndexValue)); + KeyIndexSet key_indexes; + KeyIndexSet found; + for (int i = 0; i < kN; ++i) { + key_str[i] = fmt::format("clamp_key_{:08d}", i); + keys[i] = Slice(key_str[i]); + key_indexes.insert(i); + } + + scoped_refptr trace(new Trace); + { + ADOPT_TRACE(trace.get()); + ASSERT_OK(sst->multi_get(keys.data(), key_indexes, -1, values.data(), &found)); + } + ASSERT_EQ(found.size(), static_cast(kN)); + + auto metrics = trace->metrics()->Get(); + // Inflated-before minus zero-after would be roughly -kInflatedBefore per counter + // if the clamp were removed. We assert non-negative — any positive residual + // (e.g., from MultiGet's own reads on the inner stream) is acceptable because + // SwappingStatsInputStream doesn't track real reads at all; it just produces + // a deterministic before>after pair. + EXPECT_GE(find_trace_metric(metrics, "sstable_io_local_disk_bytes"), 0); + EXPECT_GE(find_trace_metric(metrics, "sstable_io_remote_bytes"), 0); + EXPECT_GE(find_trace_metric(metrics, "sstable_io_count_local_disk"), 0); + EXPECT_GE(find_trace_metric(metrics, "sstable_io_count_remote"), 0); +} + +namespace { + +// Minimal seekable stream whose `get_io_stats_snapshot()` returns a recognizable +// sentinel. Used by the wrapper-forwarding test below to verify each wrapper +// layer in the InputStream hierarchy passes the call straight to its inner. +class SentinelSeekableStream : public io::SeekableInputStream { +public: + static constexpr int64_t kSentinel = 0xC0DE; + + StatusOr read(void* /*data*/, int64_t /*count*/) override { return 0; } + Status read_fully(void* /*data*/, int64_t /*count*/) override { return Status::OK(); } + Status seek(int64_t /*position*/) override { return Status::OK(); } + StatusOr position() override { return 0; } + StatusOr get_size() override { return 1; } + + io::IoStatsSnapshot get_io_stats_snapshot() const override { + io::IoStatsSnapshot snap; + snap.bytes_read_local_disk = kSentinel; + return snap; + } +}; + +} // namespace + +// Trivial wrappers in the InputStream / SeekableInputStream hierarchy must pass +// `get_io_stats_snapshot()` straight through to the wrapped stream — they are +// 1-line forwarders and easy to break silently. This test wraps a sentinel- +// emitting stream in each of: +// - io::InputStreamWrapper (be/src/io/input_stream.h) +// - io::SeekableInputStreamWrapper (be/src/io/seekable_input_stream.h) +// - io::SharedBufferedInputStream (be/src/io/shared_buffered_input_stream.h) +// - BundleSeekableInputStream (be/src/fs/bundle_file.h) +// and asserts the sentinel byte count survives the forward. CompressedInputStream +// is left out because its constructor needs a real StreamDecompressor and the +// forwarder there is the identical 1-liner pattern. +TEST_F(PersistentIndexSstableTest, test_io_stats_snapshot_wrapper_forwarding) { + // InputStreamWrapper + { + auto sentinel = std::make_unique(); + io::InputStreamWrapper w(std::unique_ptr(sentinel.release())); + EXPECT_EQ(SentinelSeekableStream::kSentinel, w.get_io_stats_snapshot().bytes_read_local_disk); + } + // SeekableInputStreamWrapper + { + auto sentinel = std::make_unique(); + io::SeekableInputStreamWrapper w(std::move(sentinel)); + EXPECT_EQ(SentinelSeekableStream::kSentinel, w.get_io_stats_snapshot().bytes_read_local_disk); + } + // SharedBufferedInputStream + { + auto sentinel = std::make_shared(); + io::SharedBufferedInputStream w(sentinel, "" /*filename*/, 1 /*file_size*/); + EXPECT_EQ(SentinelSeekableStream::kSentinel, w.get_io_stats_snapshot().bytes_read_local_disk); + } + // BundleSeekableInputStream + { + auto sentinel = std::make_shared(); + BundleSeekableInputStream w(sentinel, 0 /*offset*/, 1 /*size*/); + EXPECT_EQ(SentinelSeekableStream::kSentinel, w.get_io_stats_snapshot().bytes_read_local_disk); + } +} + } // namespace starrocks::lake