diff --git a/Cargo.lock b/Cargo.lock index a6966dd16d..63470a3bf3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9574,7 +9574,6 @@ dependencies = [ "lazy_static", "libc", "md-5 0.11.0", - "memmap2 0.9.10", "metrics", "num_cpus", "opentelemetry 0.32.0", diff --git a/crates/ecstore/Cargo.toml b/crates/ecstore/Cargo.toml index e3dfe1fd66..f39dd3fc85 100644 --- a/crates/ecstore/Cargo.toml +++ b/crates/ecstore/Cargo.toml @@ -101,7 +101,6 @@ num_cpus = { workspace = true } rand.workspace = true pin-project-lite.workspace = true md-5.workspace = true -memmap2 = { workspace = true } libc.workspace = true rustix = { workspace = true } rustfs-madmin.workspace = true diff --git a/crates/ecstore/src/bitrot.rs b/crates/ecstore/src/bitrot.rs index d3a599a3b2..ea2ce48a55 100644 --- a/crates/ecstore/src/bitrot.rs +++ b/crates/ecstore/src/bitrot.rs @@ -21,6 +21,19 @@ use std::time::Instant; use tokio::io::AsyncRead; use tracing::debug; +/// Adjusts a raw (offset, length) pair to account for per-shard checksum overhead. +/// Returns (adjusted_offset, adjusted_length). +pub(crate) fn adjust_shard_read_params( + offset: usize, + length: usize, + shard_size: usize, + checksum_algo: &HashAlgorithm, +) -> (usize, usize) { + let adj_len = length.div_ceil(shard_size) * checksum_algo.size() + length; + let adj_off = offset.div_ceil(shard_size) * checksum_algo.size() + offset; + (adj_off, adj_len) +} + /// Create a BitrotReader from either inline data or disk file stream /// /// # Parameters @@ -33,7 +46,7 @@ use tracing::debug; /// * `shard_size` - Shard size for erasure coding /// * `checksum_algo` - Hash algorithm for bitrot verification /// * `skip_verify` - If true, skip checksum verification -/// * `use_zero_copy` - If true, use zero-copy read (mmap on Unix) +/// * `use_zero_copy` - If true, use zero-copy read (pread on Unix) #[allow(clippy::too_many_arguments)] pub async fn create_bitrot_reader( inline_data: Option<&[u8]>, @@ -48,8 +61,7 @@ pub async fn create_bitrot_reader( use_zero_copy: bool, ) -> disk::error::Result>>> { // Calculate the total length to read, including the checksum overhead - let length = length.div_ceil(shard_size) * checksum_algo.size() + length; - let offset = offset.div_ceil(shard_size) * checksum_algo.size() + offset; + let (offset, length) = adjust_shard_read_params(offset, length, shard_size, &checksum_algo); if let Some(data) = inline_data { // Use inline data let mut rd = Cursor::new(Bytes::copy_from_slice(data)); @@ -336,4 +348,23 @@ mod tests { println!("error: {error:?}"); assert_eq!(error, DiskError::DiskNotFound); } + + #[test] + fn test_adjust_shard_read_params_no_checksum() { + let algo = HashAlgorithm::None; + let (adj_off, adj_len) = adjust_shard_read_params(10, 100, 64, &algo); + assert_eq!(adj_off, 10); + assert_eq!(adj_len, 100); + } + + #[test] + fn test_adjust_shard_read_params_known_values() { + // shard_size=128, SHA256.size()=32, offset=0, length=128 + // adj_len = 128.div_ceil(128) * 32 + 128 = 1 * 32 + 128 = 160 + // adj_off = 0.div_ceil(128) * 32 + 0 = 0 + let algo = HashAlgorithm::SHA256; + let (adj_off, adj_len) = adjust_shard_read_params(0, 128, 128, &algo); + assert_eq!(adj_off, 0); + assert_eq!(adj_len, 160); + } } diff --git a/crates/ecstore/src/disk/disk_store.rs b/crates/ecstore/src/disk/disk_store.rs index ed664e3948..c1424f11f0 100644 --- a/crates/ecstore/src/disk/disk_store.rs +++ b/crates/ecstore/src/disk/disk_store.rs @@ -37,7 +37,7 @@ use std::{ }; use tokio::{sync::RwLock, time}; use tokio_util::sync::CancellationToken; -use tracing::{info, warn}; +use tracing::{Instrument, debug_span, info, warn}; use uuid::Uuid; /// Disk health status constants @@ -495,6 +495,10 @@ impl LocalDiskWrapper { self.disk.clone() } + pub fn get_object_path_if_local(&self, volume: &str, path: &str) -> crate::disk::error::Result { + self.disk.get_object_path(volume, path) + } + pub fn runtime_state(&self) -> RuntimeDriveHealthState { self.health.runtime_state() } @@ -834,7 +838,9 @@ impl LocalDiskWrapper { } // Check if disk is stale - self.check_disk_stale().await?; + self.check_disk_stale() + .instrument(debug_span!(target: "rustfs_get_trace", "get.xl_stale_check")) + .await?; // Record operation start let now = std::time::SystemTime::now() diff --git a/crates/ecstore/src/disk/local.rs b/crates/ecstore/src/disk/local.rs index e401f67f92..da4385ccee 100644 --- a/crates/ecstore/src/disk/local.rs +++ b/crates/ecstore/src/disk/local.rs @@ -56,10 +56,10 @@ use std::{ }; use time::OffsetDateTime; use tokio::fs::{self, File}; -use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, ErrorKind}; +use tokio::io::{AsyncSeekExt, AsyncWrite, AsyncWriteExt, ErrorKind}; use tokio::sync::RwLock; use tokio::time::interval; -use tracing::{debug, error, info, warn}; +use tracing::{Instrument, debug, debug_span, error, info, warn}; use uuid::Uuid; const DELETED_OBJECTS_CLEANUP_INTERVAL: Duration = Duration::from_secs(60 * 5); @@ -293,7 +293,7 @@ fn should_reclaim_file_cache_after_write(file_size: i64) -> bool { file_size as usize >= threshold } -fn should_reclaim_file_cache_after_read(length: usize) -> bool { +pub(crate) fn should_reclaim_file_cache_after_read(length: usize) -> bool { if length == 0 { return false; } @@ -1027,40 +1027,53 @@ impl LocalDisk { volume_dir: impl AsRef, file_path: impl AsRef, ) -> Result<(Vec, Option)> { - let mut f = match super::fs::open_file(file_path.as_ref(), O_RDONLY).await { - Ok(f) => f, - Err(e) => { - if e.kind() == ErrorKind::NotFound - && !skip_access_checks(volume) - && let Err(er) = access(volume_dir.as_ref()).await - && er.kind() == ErrorKind::NotFound - { - warn!("read_all_data_with_dmtime os err {:?}", &er); - return Err(DiskError::VolumeNotFound); + let volume = volume.to_string(); + let volume_dir = volume_dir.as_ref().to_path_buf(); + let file_path = file_path.as_ref().to_path_buf(); + + tokio::task::spawn_blocking(move || { + let mut f = { + let _span = debug_span!(target: "rustfs_get_trace", "get.xl_fs_open").entered(); + match std::fs::File::open(&file_path) { + Ok(f) => f, + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound + && !skip_access_checks(&volume) + && let Err(er) = std::fs::metadata(&volume_dir) + && er.kind() == std::io::ErrorKind::NotFound + { + warn!("read_all_data_with_dmtime os err {:?}", &er); + return Err(DiskError::VolumeNotFound); + } + return Err(to_file_error(e).into()); + } } + }; - return Err(to_file_error(e).into()); - } - }; - - let meta = f.metadata().await.map_err(to_file_error)?; - - if meta.is_dir() { - return Err(DiskError::FileNotFound); - } + let meta = { + let _span = debug_span!(target: "rustfs_get_trace", "get.xl_fs_stat").entered(); + f.metadata().map_err(to_file_error).map_err(DiskError::from)? + }; - let size = meta.len() as usize; - let mut bytes = Vec::new(); - bytes.try_reserve_exact(size).map_err(Error::other)?; + if meta.is_dir() { + return Err(DiskError::FileNotFound); + } - f.read_to_end(&mut bytes).await.map_err(to_file_error)?; + let size = meta.len() as usize; + let mut bytes = Vec::new(); + bytes.try_reserve_exact(size).map_err(Error::other)?; - let modtime = match meta.modified() { - Ok(md) => Some(OffsetDateTime::from(md)), - Err(_) => None, - }; + { + let _span = debug_span!(target: "rustfs_get_trace", "get.xl_fs_read").entered(); + use std::io::Read; + f.read_to_end(&mut bytes).map_err(to_file_error).map_err(DiskError::from)?; + } - Ok((bytes, modtime)) + let modtime = meta.modified().ok().map(OffsetDateTime::from); + Ok((bytes, modtime)) + }) + .await + .map_err(DiskError::from)? } async fn delete_versions_internal(&self, volume: &str, path: &str, fis: &[FileInfo]) -> Result<()> { @@ -2184,11 +2197,8 @@ impl DiskAPI for LocalDisk { Ok(Box::new(FileCacheReclaimReader::new(f, offset as u64, length, reclaim_on_drop))) } - /// Zero-copy file read using memory mapping (Unix) or efficient read (non-Unix). + /// File read using pread (Unix) or efficient read (non-Unix). /// Returns Bytes that can be shared without copying. - // SAFETY: Unix unsafe calls in this function only query page size and mmap - // a read-only file region after bounds and alignment are validated. - #[allow(unsafe_code)] #[tracing::instrument(level = "debug", skip(self))] async fn read_file_zero_copy(&self, volume: &str, path: &str, offset: usize, length: usize) -> Result { let volume_dir = self.get_bucket_path(volume)?; @@ -2218,80 +2228,48 @@ impl DiskAPI for LocalDisk { return Err(DiskError::FileCorrupt); } - // Unix: use mmap to read the data (copies into Bytes for safe ownership) - // Non-Unix: fall back to efficient read + // Unix: use pread (read_at) to read the data without mmap overhead. + // Non-Unix: fall back to efficient read. #[cfg(unix)] { - use memmap2::MmapOptions; use std::time::Instant; let start = Instant::now(); let file_path_clone = file_path.clone(); - let should_reclaim_after_read = should_reclaim_file_cache_after_read(length); let bytes = tokio::task::spawn_blocking(move || { + use std::os::unix::fs::FileExt; let file = std::fs::File::open(&file_path_clone).map_err(DiskError::from)?; - #[cfg(target_os = "macos")] - if should_reclaim_after_read { - let _ = set_std_fd_nocache(&file); - } - - // mmap offsets on Unix must be page-size aligned. Align the - // mapping down to the nearest page boundary, then slice out the - // originally requested logical range. - // SAFETY: `sysconf(_SC_PAGESIZE)` has no pointer arguments and - // only queries process-global OS configuration. - let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) }; - if page_size <= 0 { - return Err(DiskError::other("failed to determine system page size")); + let mut buf = vec![0u8; length]; + let mut total = 0usize; + while total < length { + let nbytes = file + .read_at(&mut buf[total..], (offset + total) as u64) + .map_err(DiskError::from)?; + if nbytes == 0 { + return Err(DiskError::FileCorrupt); + } + total += nbytes; } - let page_size = page_size as u64; - let offset_u64 = offset as u64; - let aligned_offset = offset_u64 - (offset_u64 % page_size); - let logical_offset = (offset_u64 - aligned_offset) as usize; - let map_len = logical_offset - .checked_add(length) - .ok_or_else(|| DiskError::other("mmap length overflow"))?; - - // SAFETY: The file is opened as read-only, and we're mapping a region - // that we've already verified exists and is within file bounds. The - // file offset passed to mmap is page-size aligned as required on Unix. - let mmap = - unsafe { MmapOptions::new().offset(aligned_offset).len(map_len).map(&file) }.map_err(DiskError::other)?; - - // Copy only the requested logical range into a Bytes buffer. This - // avoids undefined behavior from treating OS-managed mmap memory as - // allocator-managed Vec storage, at the cost of an extra copy. - let end = logical_offset - .checked_add(length) - .ok_or_else(|| DiskError::other("mmap slice length overflow"))?; - let bytes = Bytes::copy_from_slice(&mmap[logical_offset..end]); #[cfg(target_os = "linux")] - if should_reclaim_after_read { + if should_reclaim_file_cache_after_read(length) { use core::num::NonZeroU64; use rustix::fs::{Advice, fadvise}; - - let reclaim_len = - NonZeroU64::new(map_len as u64).ok_or_else(|| DiskError::other("mmap reclaim length overflow"))?; - fadvise(&file, aligned_offset, Some(reclaim_len), Advice::DontNeed) - .map_err(std::io::Error::from) - .map_err(DiskError::from)?; + if let Some(reclaim_len) = NonZeroU64::new(length as u64) { + let _ = fadvise(&file, offset as u64, Some(reclaim_len), Advice::DontNeed); + } } - Ok::(bytes) + Ok::(Bytes::from(buf)) }) .await .map_err(DiskError::from)??; - // Log successful mmap read metrics let duration_ms = start.elapsed().as_secs_f64() * 1000.0; - - // Record mmap read metrics rustfs_io_metrics::record_zero_copy_read(length, duration_ms); - - debug!(size = length, duration_ms = duration_ms, "mmap_read_success"); + debug!(size = length, duration_ms = duration_ms, "pread_read_success"); return Ok(bytes); } @@ -2299,6 +2277,8 @@ impl DiskAPI for LocalDisk { // Non-Unix fallback: efficient read into Bytes #[cfg(not(unix))] { + use tokio::io::AsyncReadExt; + // Record zero-copy fallback rustfs_io_metrics::record_zero_copy_fallback("non_unix_platform"); @@ -2792,6 +2772,7 @@ impl DiskAPI for LocalDisk { let (data, _) = self .read_raw(volume, volume_dir.clone(), file_path, read_data) + .instrument(debug_span!(target: "rustfs_get_trace", "get.xl_meta_read")) .await .map_err(|e| { if e == DiskError::FileNotFound && !version_id.is_empty() { @@ -3107,6 +3088,84 @@ async fn get_disk_info(drive_path: PathBuf) -> Result<(rustfs_utils::os::DiskInf Ok((disk_info, root_drive)) } +/// Reads N shard files in a single blocking thread using pread (FileExt::read_at). +/// +/// Each request is `(file_path, offset, length)`. Returns `Vec>` in +/// the same order. All shards for one EC part are batched into one `spawn_blocking` +/// call to eliminate per-shard async re-poll delay while keeping total blocking-thread +/// occupancy to one event per GET. +#[cfg(unix)] +pub(crate) async fn batch_shard_pread( + requests: Vec<(std::path::PathBuf, usize, usize)>, +) -> Vec> { + use tracing::debug; + let n = requests.len(); + tokio::task::spawn_blocking(move || { + use std::os::unix::fs::FileExt; + use std::time::Instant; + let batch_start = Instant::now(); + let mut results = Vec::with_capacity(n); + + for (i, (file_path, offset, length)) in requests.into_iter().enumerate() { + let mut stat_us: u128 = 0; + let mut open_us: u128 = 0; + let mut pread_us: u128 = 0; + + let r = (|| -> crate::disk::error::Result { + let t = Instant::now(); + let meta = std::fs::metadata(&file_path).map_err(DiskError::from)?; + stat_us = t.elapsed().as_micros(); + + let end = offset.checked_add(length).ok_or(DiskError::FileCorrupt)?; + if meta.len() < end as u64 { + return Err(DiskError::FileCorrupt); + } + + let t = Instant::now(); + let file = std::fs::File::open(&file_path).map_err(DiskError::from)?; + open_us = t.elapsed().as_micros(); + + let mut buf = vec![0u8; length]; + let t = Instant::now(); + let mut total = 0usize; + while total < length { + let nbytes = file + .read_at(&mut buf[total..], (offset + total) as u64) + .map_err(DiskError::from)?; + if nbytes == 0 { + return Err(DiskError::FileCorrupt); + } + total += nbytes; + } + pread_us = t.elapsed().as_micros(); + + Ok(bytes::Bytes::from(buf)) + })(); + + debug!(target: "rustfs_get_trace", i = i, batch_stat_us = stat_us, "batch_stat"); + debug!(target: "rustfs_get_trace", i = i, batch_open_us = open_us, "batch_open"); + debug!(target: "rustfs_get_trace", i = i, batch_pread_us = pread_us, "batch_pread"); + + results.push(r); + } + + let batch_blocking_done = Instant::now(); + let batch_blocking_us = batch_start.elapsed().as_micros(); + debug!(target: "rustfs_get_trace", batch_blocking_us = batch_blocking_us, n = n, "batch_blocking"); + (results, batch_blocking_done) + }) + .await + .map(|(results, blocking_done)| { + let batch_repoll_us = blocking_done.elapsed().as_micros(); + tracing::debug!(target: "rustfs_get_trace", batch_repoll_us = batch_repoll_us, n = n, "batch_repoll"); + results + }) + .unwrap_or_else(|e| { + let msg = format!("spawn_blocking join: {e}"); + (0..n).map(|_| Err(DiskError::other(msg.clone()))).collect() + }) +} + #[cfg(test)] mod test { use super::*; @@ -3975,4 +4034,122 @@ mod test { }); }); } + + #[tokio::test] + async fn test_read_all_data_with_dmtime_round_trip() { + use tempfile::tempdir; + + let dir = tempdir().unwrap(); + let endpoint = Endpoint::try_from(dir.path().to_str().unwrap()).unwrap(); + let disk = LocalDisk::new(&endpoint, false).await.unwrap(); + + let bucket = "my-test-bucket"; + disk.make_volume(bucket).await.unwrap(); + let data = b"hello round trip"; + disk.write_all(bucket, "object/data", Bytes::copy_from_slice(data)) + .await + .unwrap(); + + let volume_dir = disk.get_bucket_path(bucket).unwrap(); + let file_path = disk.get_object_path(bucket, "object/data").unwrap(); + + let (content, modtime) = disk.read_all_data_with_dmtime(bucket, &volume_dir, &file_path).await.unwrap(); + assert_eq!(content, data); + assert!(modtime.is_some()); + } + + #[tokio::test] + async fn test_read_all_data_with_dmtime_file_not_found() { + use tempfile::tempdir; + + let dir = tempdir().unwrap(); + let endpoint = Endpoint::try_from(dir.path().to_str().unwrap()).unwrap(); + let disk = LocalDisk::new(&endpoint, false).await.unwrap(); + + let bucket = "my-test-bucket"; + disk.make_volume(bucket).await.unwrap(); + + let volume_dir = disk.get_bucket_path(bucket).unwrap(); + let file_path = volume_dir.join("nonexistent.txt"); + + let result = disk.read_all_data_with_dmtime(bucket, &volume_dir, &file_path).await; + assert!(matches!(result, Err(DiskError::FileNotFound))); + } + + #[tokio::test] + async fn test_read_all_data_with_dmtime_volume_not_found() { + use tempfile::tempdir; + + let dir = tempdir().unwrap(); + let endpoint = Endpoint::try_from(dir.path().to_str().unwrap()).unwrap(); + let disk = LocalDisk::new(&endpoint, false).await.unwrap(); + + // A plain bucket whose directory was never created; skip_access_checks returns false for it + let bucket = "regular-bucket"; + let volume_dir = disk.root.join(bucket); + let file_path = volume_dir.join("object.dat"); + + let result = disk.read_all_data_with_dmtime(bucket, &volume_dir, &file_path).await; + assert!(matches!(result, Err(DiskError::VolumeNotFound))); + } + + #[cfg(unix)] + #[tokio::test] + async fn test_batch_shard_pread_basic() { + use tempfile::tempdir; + + let dir = tempdir().unwrap(); + let payloads: &[&[u8]] = &[b"aaaaaa", b"bbbbbb", b"cccccc"]; + let mut requests = Vec::new(); + for (i, payload) in payloads.iter().enumerate() { + let p = dir.path().join(format!("shard-{i}.bin")); + std::fs::write(&p, payload).unwrap(); + requests.push((p, 0usize, payload.len())); + } + + let results = batch_shard_pread(requests).await; + assert_eq!(results.len(), payloads.len()); + for (result, expected) in results.iter().zip(payloads.iter()) { + assert_eq!(result.as_ref().unwrap().as_ref(), *expected); + } + } + + #[cfg(unix)] + #[tokio::test] + async fn test_batch_shard_pread_partial_errors() { + use tempfile::tempdir; + + let dir = tempdir().unwrap(); + let good_path = dir.path().join("good.bin"); + std::fs::write(&good_path, b"good data").unwrap(); + let missing_path = dir.path().join("does-not-exist.bin"); + + let requests = vec![(good_path, 0usize, 9usize), (missing_path, 0usize, 4usize)]; + + let results = batch_shard_pread(requests).await; + assert_eq!(results.len(), 2); + assert!(results[0].is_ok()); + assert_eq!(results[0].as_ref().unwrap().as_ref(), b"good data"); + assert!(results[1].is_err()); + } + + #[tokio::test] + async fn test_read_file_zero_copy_pread_basic() { + use tempfile::tempdir; + + let dir = tempdir().unwrap(); + let endpoint = Endpoint::try_from(dir.path().to_str().unwrap()).unwrap(); + let disk = LocalDisk::new(&endpoint, false).await.unwrap(); + + let bucket = "test-bucket-zc"; + disk.make_volume(bucket).await.unwrap(); + let data = b"0123456789abcdef"; + disk.write_all(bucket, "shard.bin", Bytes::copy_from_slice(data)) + .await + .unwrap(); + + // Read sub-range [4..12] (offset=4, length=8) + let result = disk.read_file_zero_copy(bucket, "shard.bin", 4, 8).await.unwrap(); + assert_eq!(result.as_ref(), b"456789ab"); + } } diff --git a/crates/ecstore/src/disk/mod.rs b/crates/ecstore/src/disk/mod.rs index 6e0ebeba11..ef9d9343a8 100644 --- a/crates/ecstore/src/disk/mod.rs +++ b/crates/ecstore/src/disk/mod.rs @@ -470,6 +470,15 @@ impl Disk { Disk::Remote(remote_disk) => remote_disk.enable_health_check(), } } + + /// Returns the absolute filesystem path for a (volume, path) pair if this + /// disk is local, or `None` if it is a remote disk. + pub fn get_object_path_if_local(&self, volume: &str, path: &str) -> Option> { + match self { + Disk::Local(w) => Some(w.get_object_path_if_local(volume, path)), + Disk::Remote(_) => None, + } + } } pub async fn new_disk(ep: &Endpoint, opt: &DiskOption) -> Result { diff --git a/crates/ecstore/src/erasure_coding/decode.rs b/crates/ecstore/src/erasure_coding/decode.rs index 56668891e2..854a9a6b5c 100644 --- a/crates/ecstore/src/erasure_coding/decode.rs +++ b/crates/ecstore/src/erasure_coding/decode.rs @@ -22,7 +22,7 @@ use std::io::ErrorKind; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; -use tracing::error; +use tracing::{Instrument, debug_span, error}; pin_project! { pub(crate) struct ParallelReader { @@ -271,7 +271,10 @@ impl Erasure { break; } - let (mut shards, errs) = reader.read().await; + let (mut shards, errs) = reader + .read() + .instrument(debug_span!(target: "rustfs_get_trace", "get.shard_read")) + .await; if ret_err.is_none() && let (_, Some(err)) = reduce_errs(&errs, &[]) @@ -287,13 +290,16 @@ impl Erasure { } // Decode the shards - if let Err(e) = self.decode_data(&mut shards) { + if let Err(e) = debug_span!(target: "rustfs_get_trace", "get.ec_decode").in_scope(|| self.decode_data(&mut shards)) { error!("erasure decode decode_data err: {:?}", e); ret_err = Some(e); break; } - let n = match write_data_blocks(writer, &shards, self.data_shards, block_offset, block_length).await { + let n = match write_data_blocks(writer, &shards, self.data_shards, block_offset, block_length) + .instrument(debug_span!(target: "rustfs_get_trace", "get.pipe_write")) + .await + { Ok(n) => n, Err(e) => { error!("erasure decode write_data_blocks err: {:?}", e); diff --git a/crates/ecstore/src/set_disk.rs b/crates/ecstore/src/set_disk.rs index 69f38d24c2..19dad11362 100644 --- a/crates/ecstore/src/set_disk.rs +++ b/crates/ecstore/src/set_disk.rs @@ -667,6 +667,7 @@ impl ObjectIO for SetDisks { .new_ns_lock(bucket, object) .await? .get_read_lock_with_metadata(get_lock_acquire_timeout(), read_lock_metadata(opts)) + .instrument(debug_span!(target: "rustfs_get_trace", "get.ns_lock")) .await .map_err(|e| { Error::other(format!( @@ -689,9 +690,12 @@ impl ObjectIO for SetDisks { let (fi, files, disks) = self .get_object_fileinfo(bucket, object, opts, true) + .instrument(debug_span!(target: "rustfs_get_trace", "get.xlmeta_read")) .await .map_err(|err| to_object_err(err, vec![bucket, object]))?; let object_info = ObjectInfo::from_file_info(&fi, bucket, object, opts.versioned || opts.version_suspended); + let pre_spawn_span = debug_span!(target: "rustfs_get_trace", "get.pre_spawn"); + let _pre_spawn_guard = pre_spawn_span.enter(); if object_info.delete_marker { if opts.version_id.is_none() { @@ -764,7 +768,9 @@ impl ObjectIO for SetDisks { // Move the read-lock guard into the task so it lives for the duration of the read // Note: when lock optimization is enabled, read_lock_guard is None // let _guard_to_hold = _read_lock_guard; // moved into closure below + drop(_pre_spawn_guard); tokio::spawn(async move { + drop(pre_spawn_span); let _guard = read_lock_guard; // keep guard alive until task ends (None if optimization enabled) let mut writer = wd; // Do not wrap the entire read+write pipeline in `disk_read_timeout`. @@ -784,6 +790,7 @@ impl ObjectIO for SetDisks { pool_index, skip_verify, ) + .instrument(debug_span!(target: "rustfs_get_trace", "get.spawn_task")) .await { error!("get_object_with_fileinfo {bucket}/{object} err {:?}", e); diff --git a/crates/ecstore/src/set_disk/read.rs b/crates/ecstore/src/set_disk/read.rs index ce8f2e2036..e45d2611ef 100644 --- a/crates/ecstore/src/set_disk/read.rs +++ b/crates/ecstore/src/set_disk/read.rs @@ -16,6 +16,7 @@ use super::*; use rustfs_config::{DEFAULT_OBJECT_ZERO_COPY_ENABLE, ENV_OBJECT_ZERO_COPY_ENABLE}; use std::future::Future; use tokio::task::JoinSet; +use tracing::{Instrument, debug_span}; async fn collect_read_multiple_results( tasks: Vec, @@ -235,17 +236,31 @@ impl SetDisks { let bucket = bucket.clone(); let object = object.clone(); let version_id = version_id.clone(); - tokio::spawn(async move { - if let Some(disk) = disk { - disk.read_version(&org_bucket, &bucket, &object, &version_id, &opts).await - } else { - Err(DiskError::DiskNotFound) + let spawn_time = std::time::Instant::now(); + let xl_task_span = debug_span!( + target: "rustfs_get_trace", "get.xl_task", + sched_us = tracing::field::Empty + ); + let xl_task_span_inner = xl_task_span.clone(); + tokio::spawn( + async move { + xl_task_span_inner.record("sched_us", spawn_time.elapsed().as_micros()); + if let Some(disk) = disk { + disk.read_version(&org_bucket, &bucket, &object, &version_id, &opts) + .instrument(debug_span!(target: "rustfs_get_trace", "get.xl_disk_read")) + .await + } else { + Err(DiskError::DiskNotFound) + } } - }) + .instrument(xl_task_span), + ) }); // Wait for all tasks to complete - let results = join_all(futures).await; + let results = join_all(futures) + .instrument(debug_span!(target: "rustfs_get_trace", "get.xl_fanout")) + .await; for result in results { match result { @@ -547,7 +562,11 @@ impl SetDisks { opts: &ObjectOptions, read_data: bool, ) -> Result<(FileInfo, Vec, Vec>)> { - let disks = self.disks.read().await; + let disks = self + .disks + .read() + .instrument(debug_span!(target: "rustfs_get_trace", "get.disks_lock")) + .await; let disks = disks.clone(); @@ -561,36 +580,43 @@ impl SetDisks { let _min_disks = self.set_drive_count - self.default_parity_count; - let (read_quorum, _) = match Self::object_quorum_from_meta(&parts_metadata, &errs, self.default_parity_count) - .map_err(|err| to_object_err(err.into(), vec![bucket, object])) - { - Ok(v) => v, - Err(e) => { - // error!("Self::object_quorum_from_meta: {:?}, bucket: {}, object: {}", &e, bucket, object); - return Err(e); - } - }; + let (fi, op_online_disks) = async { + let (read_quorum, _) = match Self::object_quorum_from_meta(&parts_metadata, &errs, self.default_parity_count) + .map_err(|err| to_object_err(err.into(), vec![bucket, object])) + { + Ok(v) => v, + Err(e) => { + // error!("Self::object_quorum_from_meta: {:?}, bucket: {}, object: {}", &e, bucket, object); + return Err(e); + } + }; - if let Some(err) = reduce_read_quorum_errs(&errs, OBJECT_OP_IGNORED_ERRS, read_quorum as usize) { - error!("reduce_read_quorum_errs: {:?}, bucket: {}, object: {}", &err, bucket, object); - return Err(to_object_err(err.into(), vec![bucket, object])); - } + if let Some(err) = reduce_read_quorum_errs(&errs, OBJECT_OP_IGNORED_ERRS, read_quorum as usize) { + error!("reduce_read_quorum_errs: {:?}, bucket: {}, object: {}", &err, bucket, object); + return Err(to_object_err(err.into(), vec![bucket, object])); + } - let (op_online_disks, mot_time, etag) = Self::list_online_disks(&disks, &parts_metadata, &errs, read_quorum as usize); - - let fi = Self::pick_valid_fileinfo(&parts_metadata, mot_time, etag, read_quorum as usize)?; - if errs.iter().any(|err| err.is_some()) { - let _ = - rustfs_common::heal_channel::send_heal_request(rustfs_common::heal_channel::create_heal_request_with_options( - fi.volume.to_string(), // bucket - Some(fi.name.to_string()), // object_prefix - false, // force_start - Some(HealChannelPriority::Normal), // priority - Some(self.pool_index), // pool_index - Some(self.set_index), // set_index - )) + let (op_online_disks, mot_time, etag) = Self::list_online_disks(&disks, &parts_metadata, &errs, read_quorum as usize); + + let fi = Self::pick_valid_fileinfo(&parts_metadata, mot_time, etag, read_quorum as usize)?; + if errs.iter().any(|err| err.is_some()) { + let _ = rustfs_common::heal_channel::send_heal_request( + rustfs_common::heal_channel::create_heal_request_with_options( + fi.volume.to_string(), // bucket + Some(fi.name.to_string()), // object_prefix + false, // force_start + Some(HealChannelPriority::Normal), // priority + Some(self.pool_index), // pool_index + Some(self.set_index), // set_index + ), + ) .await; + } + Ok((fi, op_online_disks)) } + .instrument(debug_span!(target: "rustfs_get_trace", "get.xl_postfanout")) + .await?; + // debug!("get_object_fileinfo pick fi {:?}", &fi); // let online_disks: Vec> = op_online_disks.iter().filter(|v| v.is_some()).cloned().collect(); @@ -755,37 +781,113 @@ impl SetDisks { // Default: enabled (true) for performance let use_zero_copy = rustfs_utils::get_env_bool(ENV_OBJECT_ZERO_COPY_ENABLE, DEFAULT_OBJECT_ZERO_COPY_ENABLE); - let mut readers = Vec::with_capacity(disks.len()); - let mut errors = Vec::with_capacity(disks.len()); - for (idx, disk_op) in disks.iter().enumerate() { - match create_bitrot_reader( - files[idx].data.as_deref(), - disk_op.as_ref(), - bucket, - &format!("{}/{}/part.{}", object, files[idx].data_dir.unwrap_or_default(), part_number), - read_offset, - till_offset.saturating_sub(read_offset), - erasure.shard_size(), - checksum_algo.clone(), - skip_verify_bitrot, - use_zero_copy, - ) - .await + let (readers, errors) = async { + // Unix fast-path: batch all local shard reads into one spawn_blocking. + // cfg-gated because batch_shard_pread and adjust_shard_read_params are + // unix-only. Falls through to the sequential path on non-unix or when + // any shard is inline / not local. + #[cfg(unix)] { - Ok(Some(reader)) => { - readers.push(Some(reader)); - errors.push(None); + use crate::bitrot::adjust_shard_read_params; + use crate::disk::local::batch_shard_pread; + + let mut batch_items: Vec<(usize, std::path::PathBuf, usize, usize)> = Vec::new(); + let mut can_batch = use_zero_copy; + let mut readers: Vec<_> = (0..disks.len()).map(|_| None).collect(); + let mut errors: Vec<_> = (0..disks.len()).map(|_| Some(DiskError::DiskNotFound)).collect(); + + if can_batch { + for (idx, disk_op) in disks.iter().enumerate() { + if files[idx].data.is_some() { + // Inline shard — fall back to sequential path. + can_batch = false; + break; + } else if let Some(disk) = disk_op.as_ref() { + let path_str = + format!("{}/{}/part.{}", object, files[idx].data_dir.unwrap_or_default(), part_number); + let (adj_off, adj_len) = adjust_shard_read_params( + read_offset, + till_offset.saturating_sub(read_offset), + erasure.shard_size(), + &checksum_algo, + ); + match disk.get_object_path_if_local(bucket, &path_str) { + Some(Ok(p)) => { + batch_items.push((idx, p, adj_off, adj_len)); + } + _ => { + can_batch = false; + break; + } + } + } + // disk_op is None → stays DiskNotFound (valid: missing/offline shard) + } } - Ok(None) => { - readers.push(None); - errors.push(Some(DiskError::DiskNotFound)); + + if can_batch && !batch_items.is_empty() { + // One spawn_blocking for all shard reads. + let requests: Vec<_> = batch_items.iter().map(|(_, p, off, len)| (p.clone(), *off, *len)).collect(); + let batch_results = batch_shard_pread(requests).await; + + for (i, (idx, _, _, _)) in batch_items.iter().enumerate() { + match &batch_results[i] { + Ok(bytes) => { + use crate::erasure_coding::BitrotReader; + let rd = std::io::Cursor::new(bytes.clone()); + readers[*idx] = Some(BitrotReader::new( + Box::new(rd) as Box, + erasure.shard_size(), + checksum_algo.clone(), + skip_verify_bitrot, + )); + errors[*idx] = None; + } + Err(e) => { + errors[*idx] = Some(e.clone()); + } + } + } + return (readers, errors); } - Err(e) => { - readers.push(None); - errors.push(Some(e)); + } + + // Sequential fallback (existing logic, unchanged). + let mut readers = Vec::with_capacity(disks.len()); + let mut errors = Vec::with_capacity(disks.len()); + for (idx, disk_op) in disks.iter().enumerate() { + match create_bitrot_reader( + files[idx].data.as_deref(), + disk_op.as_ref(), + bucket, + &format!("{}/{}/part.{}", object, files[idx].data_dir.unwrap_or_default(), part_number), + read_offset, + till_offset.saturating_sub(read_offset), + erasure.shard_size(), + checksum_algo.clone(), + skip_verify_bitrot, + use_zero_copy, + ) + .await + { + Ok(Some(r)) => { + readers.push(Some(r)); + errors.push(None); + } + Ok(None) => { + readers.push(None); + errors.push(Some(DiskError::DiskNotFound)); + } + Err(e) => { + readers.push(None); + errors.push(Some(e)); + } } } + (readers, errors) } + .instrument(debug_span!(target: "rustfs_get_trace", "get.shard_setup")) + .await; let nil_count = errors.iter().filter(|&e| e.is_none()).count(); if nil_count < erasure.data_shards { diff --git a/rustfs/src/app/object_usecase.rs b/rustfs/src/app/object_usecase.rs index 2af55bef8c..77bf59070b 100644 --- a/rustfs/src/app/object_usecase.rs +++ b/rustfs/src/app/object_usecase.rs @@ -129,7 +129,7 @@ use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_tar::Archive; use tokio_util::io::{ReaderStream, StreamReader}; -use tracing::{debug, error, info, instrument, warn}; +use tracing::{Instrument, debug, debug_span, error, info, instrument, warn}; use uuid::Uuid; const ACCEPT_RANGES_BYTES: &str = "bytes"; @@ -2317,7 +2317,9 @@ impl DefaultObjectUsecase { let helper = OperationHelper::new(&req, EventName::ObjectAccessedGet, S3Operation::GetObject).suppress_event(); // mc get 3 - let request_context = Self::prepare_get_object_request_context(&req).await?; + let request_context = Self::prepare_get_object_request_context(&req) + .instrument(debug_span!(target: "rustfs_get_trace", "get.pre_dispatch")) + .await?; let GetObjectRequestContext { bucket, key, diff --git a/scripts/perf/analyze/analyze.py b/scripts/perf/analyze/analyze.py index 5849df285e..e45f2b0a6a 100644 --- a/scripts/perf/analyze/analyze.py +++ b/scripts/perf/analyze/analyze.py @@ -754,7 +754,7 @@ def render_report_md(data: dict) -> str: disk_rows = [] for node_label, stats in sorted(dsk.items()): if not stats: - disk_rows.append([node_label, "—", "—", "—", "—", "—", "—", "—", "—"]) + disk_rows.append([node_label, "—", "—", "—", "—", "—", "—", "—"]) continue for dev in stats["devices"]: disk_rows.append([ diff --git a/scripts/perf/run-perf-test.sh b/scripts/perf/run-perf-test.sh index f3bf6dd9d0..a2a6ee799f 100755 --- a/scripts/perf/run-perf-test.sh +++ b/scripts/perf/run-perf-test.sh @@ -147,7 +147,7 @@ fi # --------------------------------------------------------------------------- if $TRACE; then - RUST_LOG='error,rustfs_put_trace=debug,rustfs_lock_rpc=debug,h2=warn,hyper=warn,tonic=warn,reqwest=warn,tower=warn' + RUST_LOG='error,rustfs_put_trace=debug,rustfs_get_trace=debug,rustfs_lock_rpc=debug,h2=warn,hyper=warn,tonic=warn,reqwest=warn,tower=warn' log "Trace mode enabled — RUST_LOG set to debug recipe" else RUST_LOG='error'