Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/fs/bundle_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ StatusOr<std::unique_ptr<io::NumericStatistics>> BundleSeekableInputStream::get_
return _stream->get_numeric_statistics();
}

io::IoStatsSnapshot BundleSeekableInputStream::get_io_stats_snapshot() const {
return _stream->get_io_stats_snapshot();
}

const std::string& BundleSeekableInputStream::filename() const {
return _stream->filename();
}
Expand Down
1 change: 1 addition & 0 deletions be/src/fs/bundle_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class BundleSeekableInputStream final : public io::SeekableInputStream {
StatusOr<int64_t> read(void* data, int64_t count) override;
Status touch_cache(int64_t offset, size_t length) override;
StatusOr<std::unique_ptr<io::NumericStatistics>> get_numeric_statistics() override;
io::IoStatsSnapshot get_io_stats_snapshot() const override;

// Override so two slices of the same physical file produce distinct keys at the same
// stream-relative offset: we fold the slice base offset into the key, yielding a key
Expand Down
14 changes: 14 additions & 0 deletions be/src/fs/fs_starlet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,20 @@ class StarletInputStream : public starrocks::io::SeekableInputStream {
return std::move(stats);
}

io::IoStatsSnapshot get_io_stats_snapshot() const override {
auto stream_st = _file_ptr->stream();
if (!stream_st.ok()) {
return {};
}
const auto& s = (*stream_st)->get_io_stats();
return {
s.bytes_read_local_disk, s.bytes_read_remote, s.bytes_write_local_disk, s.bytes_write_remote,
s.io_count_local_disk, s.io_count_remote, s.io_ns_read_local_disk, s.io_ns_read_remote,
s.io_ns_write_local_disk, s.io_ns_write_remote, s.prefetch_hit_count, s.prefetch_wait_finish_ns,
s.prefetch_pending_ns,
};
}

private:
ReadOnlyFilePtr _file_ptr;
};
Expand Down
1 change: 1 addition & 0 deletions be/src/io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ add_library(IO STATIC
compressed_input_stream.cpp
fd_output_stream.cpp
fd_input_stream.cpp
input_stream.cpp
io_profiler.cpp
seekable_input_stream.cpp
readable.cpp
Expand Down
9 changes: 9 additions & 0 deletions be/src/io/compressed_input_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,13 @@ Status CompressedInputStream::skip(int64_t n) {
return Status::OK();
}

// Out-of-line so gcov can attribute coverage to a concrete `.cpp` line.
IoStatsSnapshot CompressedInputStream::get_io_stats_snapshot() const {
return _source_stream->get_io_stats_snapshot();
}

IoStatsSnapshot CompressedSeekableInputStream::get_io_stats_snapshot() const {
return _source->get_io_stats_snapshot();
}

} // namespace starrocks::io
4 changes: 4 additions & 0 deletions be/src/io/compressed_input_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class CompressedInputStream final : public InputStream {
return _source_stream->get_numeric_statistics();
}

IoStatsSnapshot get_io_stats_snapshot() const override;

private:
// Used to store the compressed data read from |_source_stream|.
class CompressedBuffer {
Expand Down Expand Up @@ -94,6 +96,8 @@ class CompressedSeekableInputStream final : public SeekableInputStream {
return _source->get_numeric_statistics();
}

IoStatsSnapshot get_io_stats_snapshot() const override;

Status seek(int64_t position) override { return Status::NotSupported(""); }
StatusOr<int64_t> position() override { return Status::NotSupported(""); }
StatusOr<int64_t> get_size() override { return Status::NotSupported(""); }
Expand Down
30 changes: 30 additions & 0 deletions be/src/io/input_stream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "io/input_stream.h"

namespace starrocks::io {

// Out-of-line so gcov can attribute coverage to a concrete `.cpp` line: header-only
// inline definitions get inlined at every call site and the resulting `.gcda` data
// is keyed off the call site, leaving the header line counted as 0/0.
IoStatsSnapshot InputStream::get_io_stats_snapshot() const {
return {};
}

IoStatsSnapshot InputStreamWrapper::get_io_stats_snapshot() const {
return _impl->get_io_stats_snapshot();
}

} // namespace starrocks::io
35 changes: 35 additions & 0 deletions be/src/io/input_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,32 @@ namespace starrocks::io {

class NumericStatistics;

// Plain-old-data snapshot of cumulative read/write IO counters. Cheap drop-in alternative
// to `get_numeric_statistics()` for hot paths that only need a fixed set of counters
// (e.g. publish-time tracing): a single struct copy with no heap allocation, no string
// construction, and no vector growth. Streams that do not expose IO breakdown leave every
// field at zero. Adding a counter only requires extending this struct and the relevant
// concrete-stream override; the abstract interface stays the same.
struct IoStatsSnapshot {
// bytes
int64_t bytes_read_local_disk = 0;
int64_t bytes_read_remote = 0;
int64_t bytes_write_local_disk = 0;
int64_t bytes_write_remote = 0;
// op counts
int64_t io_count_local_disk = 0;
int64_t io_count_remote = 0;
// time (ns)
int64_t io_ns_read_local_disk = 0;
int64_t io_ns_read_remote = 0;
int64_t io_ns_write_local_disk = 0;
int64_t io_ns_write_remote = 0;
// prefetch
int64_t prefetch_hit_count = 0;
int64_t prefetch_wait_finish_ns = 0;
int64_t prefetch_pending_ns = 0;
};

// InputStream is the superclass of all classes representing an input stream of bytes.
class InputStream : public Readable {
public:
Expand All @@ -50,6 +76,13 @@ class InputStream : public Readable {
// If the InputStream implementation doesn't support statistics, a null pointer or
// an empty statistics is returned.
virtual StatusOr<std::unique_ptr<NumericStatistics>> get_numeric_statistics() { return nullptr; }

// Cheap fixed-shape variant of `get_numeric_statistics()`. Default returns all zeros.
// Prefer this over `get_numeric_statistics()` on hot paths (publish, multi_get, …) —
// it avoids the heap-allocated NumericStatistics object and the 11 std::string keys
// built by adapter streams. Out-of-line so gcov can credit a `.cpp` line; see
// `input_stream.cpp` for the definition.
virtual IoStatsSnapshot get_io_stats_snapshot() const;
};

class InputStreamWrapper : public InputStream {
Expand Down Expand Up @@ -81,6 +114,8 @@ class InputStreamWrapper : public InputStream {
return _impl->get_numeric_statistics();
}

IoStatsSnapshot get_io_stats_snapshot() const override;

private:
InputStream* _impl;
Ownership _ownership;
Expand Down
5 changes: 5 additions & 0 deletions be/src/io/seekable_input_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@

namespace starrocks::io {

// Out-of-line so gcov can attribute coverage to a concrete `.cpp` line.
IoStatsSnapshot SeekableInputStreamWrapper::get_io_stats_snapshot() const {
return _impl->get_io_stats_snapshot();
}

StatusOr<int64_t> SeekableInputStream::read_at(int64_t offset, void* data, int64_t count) {
RETURN_IF_ERROR(seek(offset));
return read(data, count);
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/seekable_input_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ class SeekableInputStreamWrapper : public SeekableInputStream {
return _impl->get_numeric_statistics();
}

IoStatsSnapshot get_io_stats_snapshot() const override;

StatusOr<int64_t> position() override { return _impl->position(); }

StatusOr<int64_t> read_at(int64_t offset, void* out, int64_t count) override {
Expand Down
5 changes: 5 additions & 0 deletions be/src/io/shared_buffered_input_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,4 +305,9 @@ int64_t SharedBufferedInputStream::current_range_ref_sum() const {
return ref;
}

// Out-of-line so gcov can attribute coverage to a concrete `.cpp` line.
IoStatsSnapshot SharedBufferedInputStream::get_io_stats_snapshot() const {
return _stream->get_io_stats_snapshot();
}

} // namespace starrocks::io
2 changes: 2 additions & 0 deletions be/src/io/shared_buffered_input_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class SharedBufferedInputStream : public SeekableInputStream {
return _stream->get_numeric_statistics();
}

IoStatsSnapshot get_io_stats_snapshot() const override;

Status set_io_ranges(const std::vector<IORange>& ranges, bool coalesce_lazy_column = true);
void release_to_offset(int64_t offset);
void release();
Expand Down
33 changes: 33 additions & 0 deletions be/src/storage/lake/persistent_index_sstable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

#include <butil/time.h> // NOLINT

#include <algorithm>

#include "fs/fs.h"
#include "fs/key_cache.h"
#include "gen_cpp/types.pb.h"
#include "io/input_stream.h"
#include "storage/lake/lake_delvec_loader.h"
#include "storage/lake/utils.h"
#include "storage/sstable/table_builder.h"
Expand All @@ -30,6 +33,17 @@ namespace starrocks::lake {

namespace {

// Cheap drop-in for `get_numeric_statistics()` on the publish hot path: a single
// virtual call returning a 13×int64 struct, no heap alloc / no strings / no vector.
// Plain POSIX (shared-nothing UT) returns all zero via the base default. Callers
// always pass non-null `_rf.get()` (set in `init()`) or non-null fresh `rf.get()`
// (from `ASSIGN_OR_RETURN`), so the only guard we need is against the rare case
// where the underlying file's stream pointer comes back null.
io::IoStatsSnapshot take_sstable_io_snapshot(RandomAccessFile* rf) {
auto stream = rf->stream();
return stream ? stream->get_io_stats_snapshot() : io::IoStatsSnapshot{};
}

Status drop_corrupted_sstable_cache(const std::string& path) {
#if defined(USE_STAROS) && !defined(BUILD_FORMAT_LIB)
if (!config::lake_clear_corrupted_cache_data) {
Expand Down Expand Up @@ -157,6 +171,11 @@ Status PersistentIndexSstable::multi_get(const Slice* keys, const KeyIndexSet& k
ASSIGN_OR_RETURN(rf, fs::new_random_access_file(opts, _rf->filename()));
}
options.file = rf.get();
// When parallel execution opens a fresh `rf`, its IO counters start at 0 so the delta below
// equals the absolute IO done by this multi_get. Otherwise `_rf` is reused across calls and
// we measure the delta against its running totals.
RandomAccessFile* active_rf = (rf != nullptr) ? rf.get() : _rf.get();
io::IoStatsSnapshot io_snap_before = take_sstable_io_snapshot(active_rf);
// Currently, there is no need to set predicate for MultiGet of persistent index sstable. Because predicate
// only used for sstable compaction to filter out some keys for tablet split purpose and such keys can not
// be read by the persistent index by designed. So even we provide a predicate, all keys read by multi_get
Expand All @@ -178,9 +197,23 @@ Status PersistentIndexSstable::multi_get(const Slice* keys, const KeyIndexSet& k
return multiget_st;
}
auto end_ts = butil::gettimeofday_us();
io::IoStatsSnapshot io_snap_after = take_sstable_io_snapshot(active_rf);
TRACE_COUNTER_INCREMENT("multi_get_us", end_ts - start_ts);
TRACE_COUNTER_INCREMENT("read_block_hit_cache_cnt", stat.block_cnt_from_cache);
TRACE_COUNTER_INCREMENT("read_block_miss_cache_cnt", stat.block_cnt_from_file);
// Break down the misses into reads served by the local data cache vs. reads that went out to
// the remote object store (S3/OSS/etc.). Deltas are clamped at 0 because cumulative counters
// should never decrease, but we guard against the file being swapped underneath us in retries.
TRACE_COUNTER_INCREMENT(
"sstable_io_local_disk_bytes",
std::max<int64_t>(0, io_snap_after.bytes_read_local_disk - io_snap_before.bytes_read_local_disk));
TRACE_COUNTER_INCREMENT("sstable_io_remote_bytes",
std::max<int64_t>(0, io_snap_after.bytes_read_remote - io_snap_before.bytes_read_remote));
TRACE_COUNTER_INCREMENT(
"sstable_io_count_local_disk",
std::max<int64_t>(0, io_snap_after.io_count_local_disk - io_snap_before.io_count_local_disk));
TRACE_COUNTER_INCREMENT("sstable_io_count_remote",
std::max<int64_t>(0, io_snap_after.io_count_remote - io_snap_before.io_count_remote));
size_t i = 0;
for (auto& key_index : key_indexes) {
// Index_value_with_vers is empty means key is not found in sst.
Expand Down
Loading
Loading