Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 251 additions & 25 deletions pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
use crate::virtual_file::owned_buffers_io::write::{Buffer, FlushTaskError};
use crate::virtual_file::{self, IoBufferMut, TempVirtualFile, VirtualFile, owned_buffers_io};
use crate::virtual_file::{self, IoBuffer, IoBufferMut, TempVirtualFile, VirtualFile, owned_buffers_io};

use self::owned_buffers_io::write::OwnedAsyncWriter;
use self::owned_buffers_io::io_buf_ext::FullSlice;
use arc_swap::ArcSwap;

pub struct EphemeralFile {
_tenant_shard_id: TenantShardId,
Expand All @@ -38,6 +40,10 @@ pub struct EphemeralFile {
bytes_written: AtomicU64,

resource_units: std::sync::Mutex<GlobalResourceUnits>,

/// A handle to the buffer that is currently being flushed.
/// This allows to read from it without holding the buffered_writer lock.
maybe_flushed_buffer: Arc<ArcSwap<Option<(u64, FullSlice<IoBuffer>)>>>,
}

type BufferedWriter = owned_buffers_io::write::BufferedWriter<
Expand Down Expand Up @@ -93,24 +99,31 @@ impl EphemeralFile {
gate.enter()?,
);

let writer = BufferedWriter::new(
file.clone(),
0,
|| IoBufferMut::with_capacity(TAIL_SZ),
gate.enter()?,
cancel.clone(),
ctx,
tracing::Span::current(),
);

let maybe_flushed_buffer = writer.maybe_flushed_arc();

let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore

Ok(EphemeralFile {
_tenant_shard_id: tenant_shard_id,
_timeline_id: timeline_id,
page_cache_file_id,
file: file.clone(),
buffered_writer: tokio::sync::RwLock::new(BufferedWriter::new(
file,
0,
|| IoBufferMut::with_capacity(TAIL_SZ),
gate.enter()?,
cancel.child_token(),
ctx,
info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
)),
file,
buffered_writer: tokio::sync::RwLock::new(writer),
maybe_flushed_buffer,
bytes_written: AtomicU64::new(0),
resource_units: std::sync::Mutex::new(GlobalResourceUnits::new()),
resource_units: std::sync::Mutex::new(
crate::tenant::storage_layer::inmemory_layer::GlobalResourceUnits::new(),
),
})
}
}
Expand Down Expand Up @@ -254,6 +267,26 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
// TODO(vlad): Is there a nicer way of doing this?
dst.as_mut_rust_slice_full_zeroed();

let read_len = dst.bytes_total();
let end = start + read_len as u64;

// 1. Try lock-less read from maybe_flushed_buffer
{
let guard = self.maybe_flushed_buffer.load();
if let Some((offset, buf)) = guard.as_ref() {
let buf_start = *offset;
let buf_end = buf_start + buf.len() as u64;

// If the read is fully contained in the flushed buffer, serve it lock-lessly.
if start >= buf_start && end <= buf_end {
let src_start = (start - buf_start) as usize;
let src_end = src_start + read_len;
dst.as_mut_rust_slice_full_zeroed().copy_from_slice(&buf[src_start..src_end]);
return Ok((dst, read_len));
}
}
}

let writer = self.buffered_writer.read().await;

// Read bytes written while under lock. This is a hack to deal with concurrent
Expand All @@ -276,7 +309,8 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
};

let submitted_offset = writer.bytes_submitted();
let maybe_flushed = writer.inspect_maybe_flushed();
let maybe_flushed_arc = writer.inspect_maybe_flushed().expect("always returns Some");
let maybe_flushed = maybe_flushed_arc.as_ref();

let mutable = match writer.inspect_mutable() {
Some(mutable) => &mutable[0..mutable.pending()],
Expand All @@ -301,19 +335,26 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
}

let (written_range, maybe_flushed_range) = {
if maybe_flushed.is_some() {
if let Some((offset, buf)) = maybe_flushed {
let buf_start = *offset;
let buf_end = buf_start + buf.len() as u64;
// [ written ][ maybe_flushed ][ mutable ]
// ^ ^
// buf_start buf_end
// ^
// `submitted_offset`
// <++++++ on disk +++++++????????????????>
//
// Note: buf_end should equal submitted_offset if the flush loop is keeping up.
// But maybe_flushed is a snapshot.

(
Range(
start,
std::cmp::min(end, submitted_offset.saturating_sub(TAIL_SZ as u64)),
std::cmp::min(end, buf_start),
),
Range(
std::cmp::max(start, submitted_offset.saturating_sub(TAIL_SZ as u64)),
std::cmp::min(end, submitted_offset),
std::cmp::max(start, buf_start),
std::cmp::min(end, buf_end),
),
)
} else {
Expand Down Expand Up @@ -363,14 +404,16 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
};

let dst = if maybe_flushed_range.len() > 0 {
let (offset, buf) = maybe_flushed.as_ref().unwrap();
let buf_start = *offset;

let offset_in_buffer = maybe_flushed_range
.0
.checked_sub(submitted_offset.saturating_sub(TAIL_SZ as u64))
.checked_sub(buf_start)
.unwrap()
.into_usize();
// Checked previously the buffer is Some.
let maybe_flushed = maybe_flushed.unwrap();
let to_copy = &maybe_flushed

let to_copy = &buf
[offset_in_buffer..(offset_in_buffer + maybe_flushed_range.len().into_usize())];
let bounds = dst.bounds();
let mut view = dst.slice({
Expand Down Expand Up @@ -544,7 +587,7 @@ mod tests {

let writer = file.buffered_writer.read().await;
let maybe_flushed_buffer_contents = writer.inspect_maybe_flushed().unwrap();
assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
assert_eq!(&maybe_flushed_buffer_contents.as_ref().as_ref().unwrap().1[..], &content[cap..cap * 2]);

let mutable_buffer_contents = writer.mutable();
assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
Expand Down Expand Up @@ -583,7 +626,7 @@ mod tests {
);
let writer = file.buffered_writer.read().await;
assert_eq!(
&writer.inspect_maybe_flushed().unwrap()[0..cap],
&writer.inspect_maybe_flushed().unwrap().as_ref().as_ref().unwrap().1[0..cap],
&content[cap..cap * 2]
);
assert_eq!(
Expand All @@ -592,6 +635,92 @@ mod tests {
);
}

#[tokio::test]
async fn test_concurrent_lockless_read() {
let (conf, tenant_id, timeline_id, ctx) =
harness("test_concurrent_lockless_read").unwrap();

let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let file = Arc::new(
EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
.await
.unwrap(),
);

let n_readers = 10;
let n_writes = 100;
let chunk_size = 1024;

let mut handles = Vec::new();

// Spawn writer
let file_writer = file.clone();
let ctx_writer = ctx.attached_child();
handles.push(tokio::spawn(async move {
use rand::SeedableRng;
let mut rng = rand::rngs::StdRng::from_os_rng();
for i in 0..n_writes {
let mut chunk = vec![0u8; chunk_size];
rng.fill(&mut chunk[..]);
// Fill with predictable data for verification
for j in 0..chunk_size {
chunk[j] = (i as u8).wrapping_add(j as u8);
}
file_writer.write_raw(&chunk, &ctx_writer).await.unwrap();
tokio::task::yield_now().await;
}
}));

// Spawn readers
for reader_idx in 0..n_readers {
let file_reader = file.clone();
let ctx_reader = ctx.attached_child();
handles.push(tokio::spawn(async move {
use rand::SeedableRng;
let mut rng = rand::rngs::StdRng::from_os_rng();
for _ in 0..n_writes * 2 {
let len = file_reader.len();
if len == 0 {
tokio::task::yield_now().await;
continue;
}

let read_len = rng.random_range(1..chunk_size * 2);
let start = rng.random_range(0..len);
let end = std::cmp::min(start + read_len as u64, len);
let actual_read_len = (end - start) as usize;

if actual_read_len == 0 {
continue;
}

let mut buf = IoBufferMut::with_capacity(actual_read_len);
let (buf_slice, n) = file_reader
.read_exact_at_eof_ok(start, buf.slice_full(), &ctx_reader)
.await
.unwrap();
assert_eq!(n, actual_read_len);

let buf = buf_slice.into_inner();

for k in 0..actual_read_len {
let absolute_pos = start + k as u64;
let chunk_idx = (absolute_pos / chunk_size as u64) as usize;
let byte_idx = (absolute_pos % chunk_size as u64) as usize;
let expected = (chunk_idx as u8).wrapping_add(byte_idx as u8);
assert_eq!(buf[k], expected, "Reader {} failed at pos {}", reader_idx, absolute_pos);
}

tokio::task::yield_now().await;
}
}));
}

for handle in handles {
handle.await.unwrap();
}
}
#[tokio::test]
async fn test_read_split_across_file_and_buffer() {
// This test exercises the logic on the read path that splits the logical read
Expand Down Expand Up @@ -673,4 +802,101 @@ mod tests {
in_progress.wait_until_flush_is_done().await;
test_read_all_offset_combinations().await;
}
}

#[tokio::test]
async fn test_multi_writer_race_condition() {
let (conf, tenant_id, timeline_id, ctx) =
harness("test_multi_writer_race_condition").unwrap();

let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let file = Arc::new(
EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
.await
.unwrap(),
);

let n_writers = 10;
let n_iter = 100;
let chunk_size = 1024;

let mut handles = Vec::new();
for _ in 0..n_writers {
let file = file.clone();
let ctx = ctx.attached_child();
handles.push(tokio::spawn(async move {
use rand::SeedableRng;
let mut rng = rand::rngs::StdRng::from_os_rng();
for _ in 0..n_iter {
let mut chunk = vec![0u8; chunk_size];
rng.fill(&mut chunk[..]);
file.write_raw(&chunk, &ctx).await.unwrap();
}
}));
}

for handle in handles {
handle.await.unwrap();
}

let expected_len = (n_writers * n_iter * chunk_size) as u64;
assert_eq!(file.len(), expected_len);
}

#[tokio::test]
async fn test_concurrent_flush_no_stall() {
use std::time::{Duration, Instant};
use crate::context::DownloadBehavior;
use crate::task_mgr::TaskKind;

let (conf, tenant_id, timeline_id, ctx) = harness("test_concurrent_flush_no_stall").unwrap();
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();

let file = Arc::new(
EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
.await
.unwrap(),
);

let writer_file = file.clone();
let writer_handle = tokio::spawn(async move {
let writer_ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let chunk = vec![0u8; TAIL_SZ];
for _ in 0..50 {
writer_file.write_raw(&chunk, &writer_ctx).await.unwrap();
tokio::task::yield_now().await;
}
});

let mut reader_handles = vec![];
for i in 0..5 {
let reader_file = file.clone();
reader_handles.push(tokio::spawn(async move {
let reader_ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let start = Instant::now();
let mut reads = 0;
while start.elapsed() < Duration::from_secs(1) {
let mut buf = IoBufferMut::with_capacity(1024);
if reader_file
.read_exact_at_eof_ok(0, buf.slice_full(), &reader_ctx)
.await
.is_ok()
{
reads += 1;
}
}
println!("Reader {} completed {} reads", i, reads);
reads
}));
}

writer_handle.await.unwrap();
let mut total_reads = 0;
for h in reader_handles {
total_reads += h.await.unwrap();
}

assert!(total_reads > 100, "Readers were blocked by the flush lock!");
}
}
Loading