From 99f0ca4c20dde98ce3d59f9a41ae2e68a5a4fa87 Mon Sep 17 00:00:00 2001 From: Brskt Date: Tue, 28 Apr 2026 09:58:51 +0200 Subject: [PATCH 1/2] aya: add acquire fence on perf data_head load The kernel publishes new perf records via smp_store_release on data_head, so userspace must issue an acquire barrier after the data_head load and before reading the record bytes. Without it, weakly ordered targets (arm64, ppc64, riscv) may speculatively read bytes that have not yet been published. The existing SeqCst fence before the data_tail store is the release-side barrier; it does nothing for head-side ordering. See the comment on `data_head` in include/uapi/linux/perf_event.h and perf_event_open(2). --- aya/src/maps/perf/perf_buffer.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/aya/src/maps/perf/perf_buffer.rs b/aya/src/maps/perf/perf_buffer.rs index 02a46e9fa..353c1a13b 100644 --- a/aya/src/maps/perf/perf_buffer.rs +++ b/aya/src/maps/perf/perf_buffer.rs @@ -220,6 +220,9 @@ impl PerfBuffer { let head = unsafe { (*header).data_head } as usize; let mut tail = unsafe { (*header).data_tail } as usize; + // Pair with the kernel's smp_store_release on `data_head`: subsequent + // record reads must not be reordered before observing the head. + atomic::fence(Ordering::Acquire); let result = loop { if head == tail { break Ok(()); From 4ba65d246b6b260de529677e1ffd0121b91f31d4 Mon Sep 17 00:00:00 2001 From: Brskt Date: Mon, 27 Apr 2026 22:03:35 +0200 Subject: [PATCH 2/2] aya, integration-test: replace read_events with try_fold/fold/for_each Replace `PerfEventArrayBuffer::read_events` with closure-based `try_fold`, `fold`, and `for_each`. The closure receives `PerfEvent<'_>`, which is `Sample { head, tail }` (slices borrowed from the kernel-mapped ring buffer; `tail` is empty unless the sample straddles the wrap boundary) or `Lost { count }`. The drain accumulates per-event `data_tail` advances locally and emits a single release-store on scope exit via `scopeguard::guard`, amortizing the kernel-visible barrier and remaining panic-safe. Read `data_head` with `ptr::read_volatile` + `Acquire` fence to pair with the kernel's `smp_wmb()` + `WRITE_ONCE()` publish. Read `data_tail` with a plain field access since userspace is the sole writer. Write `data_tail` with `ptr::write_volatile` + `SeqCst` fence to pair with the kernel's `READ_ONCE()`. Remove `Events`, `PerfBufferError::NoBuffers`, `PerfBufferError::MoreSpaceNeeded`, and the `bytes` dependency. Migrate the perf event array integration test to the new API: the closure destructures `PerfEvent::Sample { head, .. }` directly, dropping the `BytesMut` canary and `PADDED_SAMPLE_SIZE` plumbing. --- aya/Cargo.toml | 2 +- aya/src/maps/perf/perf_buffer.rs | 577 ++++++++---------- aya/src/maps/perf/perf_event_array.rs | 74 ++- test/integration-test/Cargo.toml | 1 - .../src/tests/perf_event_array.rs | 32 +- xtask/public-api/aya.txt | 38 +- 6 files changed, 350 insertions(+), 374 deletions(-) diff --git a/aya/Cargo.toml b/aya/Cargo.toml index 41149f9be..d4134686a 100644 --- a/aya/Cargo.toml +++ b/aya/Cargo.toml @@ -20,7 +20,6 @@ workspace = true assert_matches = { workspace = true } aya-obj = { path = "../aya-obj", version = "^0.2.2" } bitflags = { workspace = true } -bytes = { workspace = true } # TODO(https://github.com/rust-lang/rust/issues/60896): Remove once # `std::collections::hash_set::Entry` is stabilized. hashbrown = { workspace = true, features = ["default-hasher", "equivalent"] } @@ -28,6 +27,7 @@ libc = { workspace = true } log = { workspace = true } object = { workspace = true, features = ["elf", "read_core", "std", "write"] } once_cell = { workspace = true } +scopeguard = { workspace = true } thiserror = { workspace = true } [dev-dependencies] diff --git a/aya/src/maps/perf/perf_buffer.rs b/aya/src/maps/perf/perf_buffer.rs index 353c1a13b..93e2ef5fb 100644 --- a/aya/src/maps/perf/perf_buffer.rs +++ b/aya/src/maps/perf/perf_buffer.rs @@ -1,7 +1,10 @@ use std::{ + convert::Infallible, io, + ops::ControlFlow, os::fd::{AsFd, BorrowedFd}, - ptr, slice, + ptr::{self, NonNull}, + slice, sync::atomic::{self, Ordering}, }; @@ -9,7 +12,6 @@ use aya_obj::generated::{ PERF_FLAG_FD_CLOEXEC, perf_event_header, perf_event_mmap_page, perf_event_type::{PERF_RECORD_LOST, PERF_RECORD_SAMPLE}, }; -use bytes::BytesMut; use libc::{MAP_SHARED, PROT_READ, PROT_WRITE}; use thiserror::Error; @@ -55,34 +57,41 @@ pub enum PerfBufferError { io_error: io::Error, }, - /// `read_events()` was called with no output buffers. - #[error("read_events() was called with no output buffers")] - NoBuffers, - - /// `read_events()` was called with a buffer that is not large enough to - /// contain the next event in the perf buffer. - #[deprecated( - since = "0.10.8", - note = "read_events() now calls BytesMut::reserve() internally, so this error is never returned" - )] - #[error("the buffer needs to be of at least {size} bytes")] - MoreSpaceNeeded { - /// expected size - size: usize, - }, - /// An IO error occurred. #[error(transparent)] IOError(#[from] io::Error), } -/// Return type of `read_events()`. -#[derive(Debug, PartialEq, Eq)] -pub struct Events { - /// The number of events read. - pub read: usize, - /// The number of events lost. - pub lost: usize, +/// An event read from a perf event array buffer. +/// +/// Yielded by [`PerfEventArrayBuffer::try_fold`], [`PerfEventArrayBuffer::fold`], +/// and [`PerfEventArrayBuffer::for_each`]. +/// +/// [`PerfEventArrayBuffer::try_fold`]: super::PerfEventArrayBuffer::try_fold +/// [`PerfEventArrayBuffer::fold`]: super::PerfEventArrayBuffer::fold +/// [`PerfEventArrayBuffer::for_each`]: super::PerfEventArrayBuffer::for_each +#[derive(Debug)] +pub enum PerfEvent<'a> { + /// A sample emitted by `bpf_perf_event_output()`. The bytes are exposed + /// as up to two slices borrowed directly from the kernel-mapped ring + /// buffer; the second slice is empty for samples that fit contiguously, + /// and both are populated when a sample straddles the ring boundary. The + /// bytes include any kernel-side alignment padding that follows the + /// payload. + #[doc(alias = "PERF_RECORD_SAMPLE")] + Sample { + /// First chunk of the sample, or the entire sample when it does not wrap. + head: &'a [u8], + /// Second chunk; empty unless the sample straddles the ring boundary. + tail: &'a [u8], + }, + /// A signal from the kernel that samples were dropped because the ring + /// buffer was full. + #[doc(alias = "PERF_RECORD_LOST")] + Lost { + /// Number of dropped samples. + count: u64, + }, } #[cfg_attr(test, derive(Debug))] @@ -135,131 +144,189 @@ impl PerfBuffer { Ok(perf_buf) } - const fn buf(&self) -> ptr::NonNull { + const fn buf(&self) -> NonNull { self.mmap.ptr().cast() } - pub(crate) fn readable(&self) -> bool { - let header = self.buf().as_ptr(); - let head = unsafe { (*header).data_head } as usize; - let tail = unsafe { (*header).data_tail } as usize; - head != tail + /// Read `data_head`, the kernel's producer position. Pairs with the + /// kernel's `smp_wmb() + WRITE_ONCE()` publish [1]. + /// + /// [1]: https://github.com/torvalds/linux/blob/05f7e89a/kernel/events/ring_buffer.c#L113-L114 + fn data_head(&self) -> u64 { + // SAFETY: `self.buf()` points to the mmap'd `perf_event_mmap_page` for + // the lifetime of `self`, so the field projection is in-bounds and + // aligned. + let ptr = unsafe { &raw const (*self.buf().as_ptr()).data_head }; + // SAFETY: the kernel writes `data_head` concurrently; a plain read + // would be UB. + let value = unsafe { ptr::read_volatile(ptr) }; + atomic::fence(Ordering::Acquire); + value } - pub(crate) fn read_events( - &mut self, - buffers: &mut [BytesMut], - ) -> Result { - if buffers.is_empty() { - return Err(PerfBufferError::NoBuffers); - } - let header = self.buf().as_ptr(); - let base = unsafe { header.byte_add(self.page_size) }; + /// Read `data_tail`, the userspace consumer position. Userspace is the + /// sole writer, so a plain read suffices. + fn data_tail(&self) -> u64 { + // SAFETY: `self.buf()` points to the mmap'd `perf_event_mmap_page` + // for the lifetime of `self`. Only userspace writes this field, so + // there is no concurrent kernel write to race with. + unsafe { (*self.buf().as_ptr()).data_tail } + } - let mut events = Events { read: 0, lost: 0 }; - let mut buf_n = 0; + /// Mutable pointer to `data_tail`, used by [`try_fold`] to commit the + /// local position with a release-store on scope exit. + /// + /// [`try_fold`]: Self::try_fold + fn data_tail_ptr(&mut self) -> NonNull { + // SAFETY: `self.buf()` points to the mmap'd `perf_event_mmap_page` for + // the lifetime of `self`, so the field projection is in-bounds and + // aligned. + let ptr = unsafe { &raw mut (*self.buf().as_ptr()).data_tail }; + // SAFETY: `self.buf()` is `NonNull`, so a field projection within it + // is non-null. + unsafe { NonNull::new_unchecked(ptr) } + } - let fill_buf = |start_off, base: *const u8, mmap_size, out_buf: &mut [u8]| { - let len = out_buf.len(); + /// Pointer to the start of the data pages following the header page. + const fn data_pages(&self) -> *const u8 { + // SAFETY: the mmap spans `page_size + size` bytes; the data area + // starts at offset `page_size`. + unsafe { self.buf().as_ptr().byte_add(self.page_size) }.cast::() + } - let end = (start_off + len) % mmap_size; - let start = start_off % mmap_size; + pub(crate) fn readable(&self) -> bool { + self.data_head() != self.data_tail() + } - if start < end { - out_buf.copy_from_slice(unsafe { slice::from_raw_parts(base.add(start), len) }); - } else { - let size = mmap_size - start; + pub(crate) fn try_fold(&mut self, init: C, mut f: F) -> ControlFlow + where + F: FnMut(C, PerfEvent<'_>) -> ControlFlow, + { + let base = self.data_pages(); + let mmap_size = self.size; + let data_head = self.data_head(); + let initial_tail = self.data_tail(); + let data_tail_ptr = self.data_tail_ptr(); + + // Defer the userspace release-store of `data_tail` to scope exit so + // it pairs with the kernel's `READ_ONCE()` [1] in one SeqCst barrier + // instead of per event, and remains panic-safe. + // + // [1]: https://github.com/torvalds/linux/blob/05f7e89a/kernel/events/ring_buffer.c#L202 + let mut guard = scopeguard::guard(initial_tail, |tail| { + if tail != initial_tail { + atomic::fence(Ordering::SeqCst); + // SAFETY: `data_tail_ptr` was derived from a `&mut PerfBuffer` + // outliving this scope; userspace is the sole writer of + // `data_tail`. unsafe { - out_buf[..size].copy_from_slice(slice::from_raw_parts(base.add(start), size)); - out_buf[size..].copy_from_slice(slice::from_raw_parts(base, len - size)); + ptr::write_volatile(data_tail_ptr.as_ptr(), tail); } } - }; - - let read_event = |event_start, event_type, base, buf: &mut BytesMut| { - let sample_size = match event_type { - x if x == PERF_RECORD_SAMPLE as u32 || x == PERF_RECORD_LOST as u32 => { - let mut size = [0u8; size_of::()]; - fill_buf( - event_start + size_of::(), - base, - self.size, - &mut size, - ); - u32::from_ne_bytes(size) - } - _ => return Ok(None), - } as usize; - - let sample_start = - (event_start + size_of::() + size_of::()) % self.size; + }); - match event_type { + let tail = &mut *guard; + + let mut acc = init; + while *tail != data_head { + let event_start = (*tail % mmap_size as u64) as usize; + debug_assert_eq!( + event_start % 8, + 0, + "perf records are 8-byte aligned (event_start={event_start})" + ); + // SAFETY: the kernel pads each record to 8 bytes [1] and the + // buffer is page-aligned, so `base + event_start` is aligned for + // `perf_event_header` and the 8-byte header fits before the wrap + // boundary. + // + // [1]: https://github.com/torvalds/linux/blob/05f7e89a/kernel/events/core.c#L8451 + let event: perf_event_header = unsafe { ptr::read(base.add(event_start).cast()) }; + *tail = tail.wrapping_add(u64::from(event.size)); + + let perf_event = match event.type_ { x if x == PERF_RECORD_SAMPLE as u32 => { - buf.clear(); - buf.reserve(sample_size); - unsafe { buf.set_len(sample_size) } - - fill_buf(sample_start, base, self.size, buf); - - Ok(Some((1, 0))) + // The `u32` size prefix follows the header at an 8-aligned + // offset, so it never spans the wrap. + let size_offset = (event_start + size_of::()) % mmap_size; + // SAFETY: same kernel-alignment guarantee as the header read. + let sample_size: u32 = unsafe { ptr::read(base.add(size_offset).cast()) }; + let sample_size = sample_size as usize; + + // The sample payload follows the size prefix and may span + // the wrap. + let sample_start = + (event_start + size_of::() + size_of::()) + % mmap_size; + debug_assert!(sample_size <= mmap_size); + let (head, tail) = if let Some(second) = + (sample_start + sample_size).checked_sub(mmap_size) + { + let first = mmap_size - sample_start; + // SAFETY: `[sample_start, mmap_size)` and `[0, second)` + // are disjoint ranges within the mmap region; the + // kernel will not overwrite them until `data_tail` + // is committed on scope exit. + let head = unsafe { slice::from_raw_parts(base.add(sample_start), first) }; + let tail = unsafe { slice::from_raw_parts(base, second) }; + (head, tail) + } else { + // SAFETY: same as above; `sample_start + sample_size + // <= mmap_size`, contiguous within mmap. + let head = + unsafe { slice::from_raw_parts(base.add(sample_start), sample_size) }; + (head, &[][..]) + }; + PerfEvent::Sample { head, tail } } x if x == PERF_RECORD_LOST as u32 => { - let mut count = [0u8; size_of::()]; - fill_buf( - event_start + size_of::() + size_of::(), - base, - self.size, - &mut count, - ); - Ok(Some((0, u64::from_ne_bytes(count) as usize))) + // `PERF_RECORD_LOST` layout is + // `{ header, u64 id, u64 lost, sample_id }` [1]; skip past + // `id` to read the `lost` count. + // + // [1]: https://github.com/torvalds/linux/blob/05f7e89a/include/uapi/linux/perf_event.h#L906-L914 + let lost_offset = + (event_start + size_of::() + size_of::()) + % mmap_size; + // SAFETY: same kernel-alignment guarantee as the header read; the + // fixed 8-byte field at an 8-aligned offset cannot span the wrap. + let count: u64 = unsafe { ptr::read(base.add(lost_offset).cast()) }; + PerfEvent::Lost { count } } - _ => Ok(None), - } - }; + event_type => { + // `PerfBuffer::open` configures `SoftwareEvent::BpfOutput` + // with no side-band attr flags [1]; the kernel only emits + // SAMPLE and LOST. + // + // [1]: https://github.com/torvalds/linux/blob/05f7e89a/kernel/events/core.c#L5182-L5200 + debug_assert!(false, "unexpected perf record type: {event_type}"); + continue; + } + }; - let head = unsafe { (*header).data_head } as usize; - let mut tail = unsafe { (*header).data_tail } as usize; - // Pair with the kernel's smp_store_release on `data_head`: subsequent - // record reads must not be reordered before observing the head. - atomic::fence(Ordering::Acquire); - let result = loop { - if head == tail { - break Ok(()); - } - if buf_n == buffers.len() { - break Ok(()); + match f(acc, perf_event) { + ControlFlow::Continue(next) => acc = next, + ControlFlow::Break(v) => return ControlFlow::Break(v), } + } - let buf = &mut buffers[buf_n]; - - let event_start = tail % self.size; - let event: perf_event_header = - unsafe { ptr::read_unaligned(base.byte_add(event_start).cast()) }; - let event_size = event.size as usize; - - match read_event(event_start, event.type_, base.cast(), buf) { - Ok(Some((read, lost))) => { - if read > 0 { - buf_n += 1; - events.read += read; - } - events.lost += lost; - } - Ok(None) => { /* skip unknown event type */ } - Err(e) => { - // we got an error and we didn't process any events, propagate the error - // and give the caller a chance to increase buffers - break Err(e); - } - } - tail += event_size; - }; + ControlFlow::Continue(acc) + } - atomic::fence(Ordering::SeqCst); - unsafe { (*header).data_tail = tail as u64 } + pub(crate) fn fold(&mut self, init: C, mut f: F) -> C + where + F: FnMut(C, PerfEvent<'_>) -> C, + { + let ControlFlow::Continue(acc) = self + .try_fold::(init, |acc, event| ControlFlow::Continue(f(acc, event))); + acc + } - result.map(|()| events) + pub(crate) fn for_each(&mut self, mut f: F) + where + F: FnMut(PerfEvent<'_>), + { + self.fold((), |(), event| f(event)) } } @@ -281,6 +348,7 @@ mod tests { use std::fmt::Debug; use assert_matches::assert_matches; + use test_case::test_case; use super::*; use crate::sys::{Syscall, TEST_MMAP_RET, override_syscall}; @@ -325,34 +393,6 @@ mod tests { ); } - #[test] - fn test_no_out_bufs() { - let mut mmapped_buf = MMappedBuf { - data: [0; PAGE_SIZE * 2], - }; - - fake_mmap(&mut mmapped_buf); - let mut buf = PerfBuffer::open(1, PAGE_SIZE, 1).unwrap(); - - assert_matches!(buf.read_events(&mut []), Err(PerfBufferError::NoBuffers)) - } - - #[test] - fn test_no_events() { - let mut mmapped_buf = MMappedBuf { - data: [0; PAGE_SIZE * 2], - }; - - fake_mmap(&mut mmapped_buf); - let mut buf = PerfBuffer::open(1, PAGE_SIZE, 1).unwrap(); - - let out_buf = BytesMut::with_capacity(4); - assert_eq!( - buf.read_events(&mut [out_buf]).unwrap(), - Events { read: 0, lost: 0 } - ); - } - fn write(mmapped_buf: &mut MMappedBuf, offset: usize, value: T) -> usize { let dst: *mut _ = mmapped_buf; let head = offset + size_of::(); @@ -363,45 +403,9 @@ mod tests { head } - #[test] - fn test_read_first_lost() { - #[repr(C)] - #[derive(Debug)] - struct LostSamples { - header: perf_event_header, - id: u64, - count: u64, - } - - let mut mmapped_buf = MMappedBuf { - data: [0; PAGE_SIZE * 2], - }; - - write( - &mut mmapped_buf, - 0, - LostSamples { - header: perf_event_header { - type_: PERF_RECORD_LOST as u32, - misc: 0, - size: size_of::() as u16, - }, - id: 1, - count: 0xCAFEBABE, - }, - ); - - fake_mmap(&mut mmapped_buf); - let mut buf = PerfBuffer::open(1, PAGE_SIZE, 1).unwrap(); - - let out_buf = BytesMut::with_capacity(0); - let events = buf.read_events(&mut [out_buf]).unwrap(); - assert_eq!(events.lost, 0xCAFEBABE); - } - #[repr(C)] #[derive(Debug)] - struct PerfSample { + struct TestPerfRecord { s_hdr: Sample, value: T, } @@ -410,12 +414,12 @@ mod tests { write( mmapped_buf, offset, - PerfSample { + TestPerfRecord { s_hdr: Sample { header: perf_event_header { type_: PERF_RECORD_SAMPLE as u32, misc: 0, - size: size_of::>() as u16, + size: size_of::>() as u16, }, size: size_of::() as u32, }, @@ -428,166 +432,127 @@ mod tests { u32::from_ne_bytes(buf[..4].try_into().unwrap()) } - fn u64_from_buf(buf: &[u8]) -> u64 { - u64::from_ne_bytes(buf[..8].try_into().unwrap()) - } - - #[test] - fn test_read_first_sample() { - let mut mmapped_buf = MMappedBuf { - data: [0; PAGE_SIZE * 2], - }; - - write_sample(&mut mmapped_buf, 0, 0xCAFEBABEu32); - - fake_mmap(&mut mmapped_buf); - let mut buf = PerfBuffer::open(1, PAGE_SIZE, 1).unwrap(); - - let mut out_bufs = [BytesMut::with_capacity(4)]; - - let events = buf.read_events(&mut out_bufs).unwrap(); - assert_eq!(events, Events { lost: 0, read: 1 }); - assert_eq!(u32_from_buf(&out_bufs[0]), 0xCAFEBABE); - } - - #[test] - fn test_read_many_with_many_reads() { - let mut mmapped_buf = MMappedBuf { - data: [0; PAGE_SIZE * 2], - }; - - let next = write_sample(&mut mmapped_buf, 0, 0xCAFEBABEu32); - write_sample(&mut mmapped_buf, next, 0xBADCAFEu32); - - fake_mmap(&mut mmapped_buf); - let mut buf = PerfBuffer::open(1, PAGE_SIZE, 1).unwrap(); - - let mut out_bufs = [BytesMut::with_capacity(4)]; - - let events = buf.read_events(&mut out_bufs).unwrap(); - assert_eq!(events, Events { lost: 0, read: 1 }); - assert_eq!(u32_from_buf(&out_bufs[0]), 0xCAFEBABE); - - let events = buf.read_events(&mut out_bufs).unwrap(); - assert_eq!(events, Events { lost: 0, read: 1 }); - assert_eq!(u32_from_buf(&out_bufs[0]), 0xBADCAFE); + fn sample_bytes(head: &[u8], tail: &[u8]) -> Vec { + let mut v = Vec::with_capacity(head.len() + tail.len()); + v.extend_from_slice(head); + v.extend_from_slice(tail); + v } - #[test] - fn test_read_many_with_one_read() { + #[test_case(&[] ; "empty")] + #[test_case(&[0xCAFEBABEu32] ; "single")] + #[test_case(&[0xCAFEBABEu32, 0xBADCAFEu32] ; "consecutive")] + fn test_for_each_samples(expected: &[u32]) { let mut mmapped_buf = MMappedBuf { data: [0; PAGE_SIZE * 2], }; - let next = write_sample(&mut mmapped_buf, 0, 0xCAFEBABEu32); - write_sample(&mut mmapped_buf, next, 0xBADCAFEu32); + let mut offset = 0; + for &v in expected { + offset = write_sample(&mut mmapped_buf, offset, v); + } fake_mmap(&mut mmapped_buf); let mut buf = PerfBuffer::open(1, PAGE_SIZE, 1).unwrap(); - let mut out_bufs = std::iter::repeat_n(BytesMut::with_capacity(4), 3).collect::>(); - - let events = buf.read_events(&mut out_bufs).unwrap(); - assert_eq!(events, Events { lost: 0, read: 2 }); - assert_eq!(u32_from_buf(&out_bufs[0]), 0xCAFEBABE); - assert_eq!(u32_from_buf(&out_bufs[1]), 0xBADCAFE); + let mut payloads = Vec::new(); + buf.for_each(|event| match event { + PerfEvent::Sample { head, tail } => { + payloads.push(u32_from_buf(&sample_bytes(head, tail))); + } + PerfEvent::Lost { count } => panic!("unexpected lost: {count}"), + }); + assert_eq!(payloads, expected); } #[test] - fn test_read_last_sample() { - let mut mmapped_buf = MMappedBuf { - data: [0; PAGE_SIZE * 2], - }; - - let offset = PAGE_SIZE - size_of::>(); - write_sample(&mut mmapped_buf, offset, 0xCAFEBABEu32); - mmapped_buf.mmap_page.data_tail = offset as u64; - - fake_mmap(&mut mmapped_buf); - let mut buf = PerfBuffer::open(1, PAGE_SIZE, 1).unwrap(); - - let mut out_bufs = [BytesMut::with_capacity(4)]; - - let events = buf.read_events(&mut out_bufs).unwrap(); - assert_eq!(events, Events { lost: 0, read: 1 }); - assert_eq!(u32_from_buf(&out_bufs[0]), 0xCAFEBABE); - } + fn test_for_each_lost() { + #[repr(C)] + #[derive(Debug)] + struct LostSamples { + header: perf_event_header, + id: u64, + count: u64, + } - #[test] - fn test_read_wrapping_sample_size() { let mut mmapped_buf = MMappedBuf { data: [0; PAGE_SIZE * 2], }; - let offset = PAGE_SIZE - size_of::() - 2; write( &mut mmapped_buf, - offset, - perf_event_header { - type_: PERF_RECORD_SAMPLE as u32, - misc: 0, - size: size_of::>() as u16, + 0, + LostSamples { + header: perf_event_header { + type_: PERF_RECORD_LOST as u32, + misc: 0, + size: size_of::() as u16, + }, + id: 1, + count: 0xCAFEBABE, }, ); - mmapped_buf.mmap_page.data_tail = offset as u64; - - let (left, right) = if cfg!(target_endian = "little") { - (0x0004u16, 0x0000u16) - } else { - (0x0000u16, 0x0004u16) - }; - write(&mut mmapped_buf, PAGE_SIZE - 2, left); - write(&mut mmapped_buf, 0, right); - write(&mut mmapped_buf, 2, 0xBAADCAFEu32); fake_mmap(&mut mmapped_buf); let mut buf = PerfBuffer::open(1, PAGE_SIZE, 1).unwrap(); - let mut out_bufs = [BytesMut::with_capacity(8)]; - - let events = buf.read_events(&mut out_bufs).unwrap(); - assert_eq!(events, Events { lost: 0, read: 1 }); - assert_eq!(u32_from_buf(&out_bufs[0]), 0xBAADCAFE); + let mut events = Vec::new(); + buf.for_each(|event| match event { + PerfEvent::Sample { .. } => panic!("unexpected sample"), + PerfEvent::Lost { count } => events.push(count), + }); + assert_eq!(events, [0xCAFEBABE]); } - #[test] - fn test_read_wrapping_value() { - let mut mmapped_buf = MMappedBuf { - data: [0; PAGE_SIZE * 2], - }; - + // The `write` helper sets `data_head` to the post-write absolute byte + // offset, which is convenient for non-wrapping records but does not match + // the kernel-side semantics of monotonic `data_head`/`data_tail` counters. + // Wrap fixtures fix `data_head` up afterwards to match the real kernel + // contract (tail + total_event_size). + fn fixture_wrap_data(mmapped_buf: &mut MMappedBuf) -> &'static [u8] { let (left, right) = if cfg!(target_endian = "little") { (0xCAFEBABEu32, 0xBAADCAFEu32) } else { (0xBAADCAFEu32, 0xCAFEBABEu32) }; - - let offset = PAGE_SIZE - size_of::>(); + let offset = PAGE_SIZE - size_of::>(); write( - &mut mmapped_buf, + mmapped_buf, offset, - PerfSample { + TestPerfRecord { s_hdr: Sample { header: perf_event_header { type_: PERF_RECORD_SAMPLE as u32, misc: 0, - size: size_of::>() as u16, + size: size_of::>() as u16, }, size: size_of::() as u32, }, value: left, }, ); - write(&mut mmapped_buf, 0, right); + write(mmapped_buf, 0, right); mmapped_buf.mmap_page.data_tail = offset as u64; + mmapped_buf.mmap_page.data_head = (offset + size_of::>()) as u64; + static EXPECTED: [u8; 8] = 0xBAADCAFECAFEBABEu64.to_ne_bytes(); + &EXPECTED + } + + #[test] + fn test_for_each_wrap() { + let mut mmapped_buf = MMappedBuf { + data: [0; PAGE_SIZE * 2], + }; + let expected = fixture_wrap_data(&mut mmapped_buf); fake_mmap(&mut mmapped_buf); let mut buf = PerfBuffer::open(1, PAGE_SIZE, 1).unwrap(); - let mut out_bufs = [BytesMut::with_capacity(8)]; - - let events = buf.read_events(&mut out_bufs).unwrap(); - assert_eq!(events, Events { lost: 0, read: 1 }); - assert_eq!(u64_from_buf(&out_bufs[0]), 0xBAADCAFECAFEBABE); + let mut got: Vec = Vec::new(); + buf.for_each(|event| match event { + PerfEvent::Sample { head, tail } => got.extend(sample_bytes(head, tail)), + PerfEvent::Lost { count } => panic!("unexpected lost: {count}"), + }); + assert_eq!(&got[..expected.len()], expected); } } diff --git a/aya/src/maps/perf/perf_event_array.rs b/aya/src/maps/perf/perf_event_array.rs index d992f3d96..03dc483a3 100644 --- a/aya/src/maps/perf/perf_event_array.rs +++ b/aya/src/maps/perf/perf_event_array.rs @@ -4,18 +4,16 @@ use std::{ borrow::{Borrow, BorrowMut}, - ops::Deref as _, + ops::{ControlFlow, Deref as _}, os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd}, path::Path, sync::Arc, }; -use bytes::BytesMut; - use crate::{ maps::{ MapData, MapError, PinError, - perf::{Events, PerfBuffer, PerfBufferError}, + perf::{PerfBuffer, PerfBufferError, PerfEvent}, }, sys::bpf_map_update_elem, util::page_size, @@ -39,21 +37,45 @@ impl> PerfEventArrayBuffer { self.buf.readable() } - /// Reads events from the buffer. + /// Processes events available in the buffer with `f`. /// - /// This method reads events into the provided slice of buffers, filling - /// each buffer in order stopping when there are no more events to read or - /// all the buffers have been filled. + /// For each available event, `f` receives the accumulator and event: + /// * [`ControlFlow::Continue(next)`](ControlFlow::Continue) keeps draining with `next`. + /// * [`ControlFlow::Break(break_value)`](ControlFlow::Break) stops early and returns `break_value`. /// - /// Returns the number of events read and the number of events lost. Events - /// are lost when user space doesn't read events fast enough and the ring - /// buffer fills up. + /// If the buffer is fully drained, returns [`ControlFlow::Continue`] + /// containing the final accumulator. /// - /// # Errors + /// The slices in [`PerfEvent::Sample`] are borrowed directly from the perf + /// ring buffer; the borrow is bounded by the closure invocation. The + /// kernel-visible `data_tail` is advanced once at the end of the call, + /// amortizing the `SeqCst` store across the drain. + pub fn try_fold(&mut self, init: C, f: F) -> ControlFlow + where + F: FnMut(C, PerfEvent<'_>) -> ControlFlow, + { + self.buf.try_fold(init, f) + } + + /// Processes events available in the buffer with `f`. /// - /// [`PerfBufferError::NoBuffers`] is returned when `out_bufs` is empty. - pub fn read_events(&mut self, out_bufs: &mut [BytesMut]) -> Result { - self.buf.read_events(out_bufs) + /// For each available event, `f` receives the accumulator and event, and + /// returns the next accumulator. Unlike [`PerfEventArrayBuffer::try_fold`], + /// this function cannot short-circuit: it always processes events until the + /// buffer is fully drained, then returns the final accumulator. + pub fn fold(&mut self, init: C, f: F) -> C + where + F: FnMut(C, PerfEvent<'_>) -> C, + { + self.buf.fold(init, f) + } + + /// Processes events available in the buffer with `f`. + pub fn for_each(&mut self, f: F) + where + F: FnMut(PerfEvent<'_>), + { + self.buf.for_each(f) } } @@ -78,7 +100,10 @@ impl> AsRawFd for PerfEventArrayBuffer { /// * call [`PerfEventArray::open`] /// * poll the returned [`PerfEventArrayBuffer`] to be notified when events are /// inserted in the buffer -/// * call [`PerfEventArrayBuffer::read_events`] to read the events +/// * drain events with [`PerfEventArrayBuffer::for_each`] (or [`fold`]/[`try_fold`]) +/// +/// [`fold`]: PerfEventArrayBuffer::fold +/// [`try_fold`]: PerfEventArrayBuffer::try_fold /// /// # Minimum kernel version /// @@ -90,7 +115,7 @@ impl> AsRawFd for PerfEventArrayBuffer { /// available CPU: /// /// ```no_run -/// # use aya::maps::perf::PerfEventArrayBuffer; +/// # use aya::maps::perf::{PerfEvent, PerfEventArrayBuffer}; /// # use aya::maps::MapData; /// # use std::borrow::BorrowMut; /// # struct Poll { _t: std::marker::PhantomData }; @@ -116,7 +141,6 @@ impl> AsRawFd for PerfEventArrayBuffer { /// # let mut bpf = aya::Ebpf::load(&[])?; /// use aya::maps::PerfEventArray; /// use aya::util::online_cpus; -/// use bytes::BytesMut; /// /// let mut perf_array = PerfEventArray::try_from(bpf.map_mut("EVENTS").unwrap())?; /// @@ -128,14 +152,18 @@ impl> AsRawFd for PerfEventArrayBuffer { /// perf_buffers.push(perf_array.open(cpu_id, None)?); /// } /// -/// let mut out_bufs = [BytesMut::with_capacity(1024)]; -/// /// // poll the buffers to know when they have queued events /// let poll = poll_buffers(perf_buffers); /// loop { -/// for read_buf in poll.poll_readable() { -/// read_buf.read_events(&mut out_bufs)?; -/// // process out_bufs +/// for perf_buf in poll.poll_readable() { +/// perf_buf.for_each(|event| match event { +/// PerfEvent::Sample { head, tail } => { +/// // process the sample bytes (`tail` is empty unless the sample wraps) +/// } +/// PerfEvent::Lost { count } => { +/// // record the dropped-events counter +/// } +/// }); /// } /// } /// diff --git a/test/integration-test/Cargo.toml b/test/integration-test/Cargo.toml index 241bf1ae3..81e5fec45 100644 --- a/test/integration-test/Cargo.toml +++ b/test/integration-test/Cargo.toml @@ -21,7 +21,6 @@ anyhow = { workspace = true, features = ["std"] } assert_matches = { workspace = true } aya-log = { path = "../../aya-log", version = "^0.2.2", default-features = false } aya-obj = { path = "../../aya-obj", version = "^0.2.2" } -bytes = { workspace = true } epoll = { workspace = true } futures = { workspace = true, features = ["alloc"] } integration-common = { path = "../integration-common", features = ["user"] } diff --git a/test/integration-test/src/tests/perf_event_array.rs b/test/integration-test/src/tests/perf_event_array.rs index 4c3affc04..4edfdbcb7 100644 --- a/test/integration-test/src/tests/perf_event_array.rs +++ b/test/integration-test/src/tests/perf_event_array.rs @@ -1,10 +1,9 @@ use aya::{ EbpfLoader, - maps::{PerfEventArray, perf::Events}, + maps::{PerfEventArray, perf::PerfEvent}, programs::{UProbe, uprobe::UProbeScope}, util::online_cpus, }; -use bytes::BytesMut; use test_case::test_case; #[unsafe(no_mangle)] @@ -48,29 +47,16 @@ fn emit_event(bpf_obj: &[u8], events_map: &str, prog: &str) { trigger_emit_event(); - // PERF_SAMPLE_RAW is encoded as `u32 size + data`; the kernel rounds the - // field's total to 8 bytes. `read_events` copies `size` bytes (payload + pad). - const PADDED_SAMPLE_SIZE: usize = { - let size_field = size_of::(); - (size_field + size_of::()).next_multiple_of(8) - size_field - }; - let mut payloads = Vec::new(); for buf in &mut buffers { - let mut out = [BytesMut::from(&[0xAAu8; PADDED_SAMPLE_SIZE * 2][..])]; - let tail = out[0].split_off(PADDED_SAMPLE_SIZE); - let Events { read, lost } = buf.read_events(&mut out).unwrap(); - assert_eq!(lost, 0); - assert!( - tail.iter().all(|&b| b == 0xAA), - "bytes beyond payload were overwritten", - ); - for sample in &out[..read] { - assert_eq!(sample.len(), PADDED_SAMPLE_SIZE); - payloads.push(u64::from_ne_bytes( - sample[..size_of::()].try_into().unwrap(), - )); - } + buf.for_each(|event| match event { + PerfEvent::Sample { head, .. } => { + payloads.push(u64::from_ne_bytes( + head[..size_of::()].try_into().unwrap(), + )); + } + PerfEvent::Lost { count } => panic!("kernel dropped {count} samples"), + }); } assert_eq!(payloads, [0xDEAD_BEEFu64]); } diff --git a/xtask/public-api/aya.txt b/xtask/public-api/aya.txt index d6acf99a7..eeeae67c8 100644 --- a/xtask/public-api/aya.txt +++ b/xtask/public-api/aya.txt @@ -243,9 +243,6 @@ pub aya::maps::perf::PerfBufferError::InvalidPageCount pub aya::maps::perf::PerfBufferError::InvalidPageCount::page_count: usize pub aya::maps::perf::PerfBufferError::MMapError pub aya::maps::perf::PerfBufferError::MMapError::io_error: std::io::error::Error -pub aya::maps::perf::PerfBufferError::MoreSpaceNeeded -pub aya::maps::perf::PerfBufferError::MoreSpaceNeeded::size: usize -pub aya::maps::perf::PerfBufferError::NoBuffers pub aya::maps::perf::PerfBufferError::OpenError pub aya::maps::perf::PerfBufferError::OpenError::io_error: std::io::error::Error pub aya::maps::perf::PerfBufferError::PerfEventEnableError @@ -265,22 +262,21 @@ impl core::marker::Unpin for aya::maps::perf::PerfBufferError impl core::marker::UnsafeUnpin for aya::maps::perf::PerfBufferError impl !core::panic::unwind_safe::RefUnwindSafe for aya::maps::perf::PerfBufferError impl !core::panic::unwind_safe::UnwindSafe for aya::maps::perf::PerfBufferError -pub struct aya::maps::perf::Events -pub aya::maps::perf::Events::lost: usize -pub aya::maps::perf::Events::read: usize -impl core::cmp::Eq for aya::maps::perf::Events -impl core::cmp::PartialEq for aya::maps::perf::Events -pub fn aya::maps::perf::Events::eq(&self, other: &aya::maps::perf::Events) -> bool -impl core::fmt::Debug for aya::maps::perf::Events -pub fn aya::maps::perf::Events::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result -impl core::marker::StructuralPartialEq for aya::maps::perf::Events -impl core::marker::Freeze for aya::maps::perf::Events -impl core::marker::Send for aya::maps::perf::Events -impl core::marker::Sync for aya::maps::perf::Events -impl core::marker::Unpin for aya::maps::perf::Events -impl core::marker::UnsafeUnpin for aya::maps::perf::Events -impl core::panic::unwind_safe::RefUnwindSafe for aya::maps::perf::Events -impl core::panic::unwind_safe::UnwindSafe for aya::maps::perf::Events +pub enum aya::maps::perf::PerfEvent<'a> +pub aya::maps::perf::PerfEvent::Lost +pub aya::maps::perf::PerfEvent::Lost::count: u64 +pub aya::maps::perf::PerfEvent::Sample +pub aya::maps::perf::PerfEvent::Sample::head: &'a [u8] +pub aya::maps::perf::PerfEvent::Sample::tail: &'a [u8] +impl<'a> core::fmt::Debug for aya::maps::perf::PerfEvent<'a> +pub fn aya::maps::perf::PerfEvent<'a>::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl<'a> core::marker::Freeze for aya::maps::perf::PerfEvent<'a> +impl<'a> core::marker::Send for aya::maps::perf::PerfEvent<'a> +impl<'a> core::marker::Sync for aya::maps::perf::PerfEvent<'a> +impl<'a> core::marker::Unpin for aya::maps::perf::PerfEvent<'a> +impl<'a> core::marker::UnsafeUnpin for aya::maps::perf::PerfEvent<'a> +impl<'a> core::panic::unwind_safe::RefUnwindSafe for aya::maps::perf::PerfEvent<'a> +impl<'a> core::panic::unwind_safe::UnwindSafe for aya::maps::perf::PerfEvent<'a> pub struct aya::maps::perf::PerfEventArray impl> aya::maps::perf::PerfEventArray pub fn aya::maps::perf::PerfEventArray::pin>(&self, path: P) -> core::result::Result<(), aya::pin::PinError> @@ -304,8 +300,10 @@ impl core::panic::unwind_safe::RefUnwindSafe for aya::maps::perf::PerfEventAr impl core::panic::unwind_safe::UnwindSafe for aya::maps::perf::PerfEventArray where T: core::panic::unwind_safe::RefUnwindSafe pub struct aya::maps::perf::PerfEventArrayBuffer impl> aya::maps::perf::PerfEventArrayBuffer -pub fn aya::maps::perf::PerfEventArrayBuffer::read_events(&mut self, out_bufs: &mut [bytes::bytes_mut::BytesMut]) -> core::result::Result +pub fn aya::maps::perf::PerfEventArrayBuffer::fold(&mut self, init: C, f: F) -> C where F: core::ops::function::FnMut(C, aya::maps::perf::PerfEvent<'_>) -> C +pub fn aya::maps::perf::PerfEventArrayBuffer::for_each(&mut self, f: F) where F: core::ops::function::FnMut(aya::maps::perf::PerfEvent<'_>) pub fn aya::maps::perf::PerfEventArrayBuffer::readable(&self) -> bool +pub fn aya::maps::perf::PerfEventArrayBuffer::try_fold(&mut self, init: C, f: F) -> core::ops::control_flow::ControlFlow where F: core::ops::function::FnMut(C, aya::maps::perf::PerfEvent<'_>) -> core::ops::control_flow::ControlFlow impl> std::os::fd::owned::AsFd for aya::maps::perf::PerfEventArrayBuffer pub fn aya::maps::perf::PerfEventArrayBuffer::as_fd(&self) -> std::os::fd::owned::BorrowedFd<'_> impl> std::os::fd::raw::AsRawFd for aya::maps::perf::PerfEventArrayBuffer