diff --git a/aya/Cargo.toml b/aya/Cargo.toml index 41149f9be..d4134686a 100644 --- a/aya/Cargo.toml +++ b/aya/Cargo.toml @@ -20,7 +20,6 @@ workspace = true assert_matches = { workspace = true } aya-obj = { path = "../aya-obj", version = "^0.2.2" } bitflags = { workspace = true } -bytes = { workspace = true } # TODO(https://github.com/rust-lang/rust/issues/60896): Remove once # `std::collections::hash_set::Entry` is stabilized. hashbrown = { workspace = true, features = ["default-hasher", "equivalent"] } @@ -28,6 +27,7 @@ libc = { workspace = true } log = { workspace = true } object = { workspace = true, features = ["elf", "read_core", "std", "write"] } once_cell = { workspace = true } +scopeguard = { workspace = true } thiserror = { workspace = true } [dev-dependencies] diff --git a/aya/src/maps/perf/perf_buffer.rs b/aya/src/maps/perf/perf_buffer.rs index 02a46e9fa..93e2ef5fb 100644 --- a/aya/src/maps/perf/perf_buffer.rs +++ b/aya/src/maps/perf/perf_buffer.rs @@ -1,7 +1,10 @@ use std::{ + convert::Infallible, io, + ops::ControlFlow, os::fd::{AsFd, BorrowedFd}, - ptr, slice, + ptr::{self, NonNull}, + slice, sync::atomic::{self, Ordering}, }; @@ -9,7 +12,6 @@ use aya_obj::generated::{ PERF_FLAG_FD_CLOEXEC, perf_event_header, perf_event_mmap_page, perf_event_type::{PERF_RECORD_LOST, PERF_RECORD_SAMPLE}, }; -use bytes::BytesMut; use libc::{MAP_SHARED, PROT_READ, PROT_WRITE}; use thiserror::Error; @@ -55,34 +57,41 @@ pub enum PerfBufferError { io_error: io::Error, }, - /// `read_events()` was called with no output buffers. - #[error("read_events() was called with no output buffers")] - NoBuffers, - - /// `read_events()` was called with a buffer that is not large enough to - /// contain the next event in the perf buffer. - #[deprecated( - since = "0.10.8", - note = "read_events() now calls BytesMut::reserve() internally, so this error is never returned" - )] - #[error("the buffer needs to be of at least {size} bytes")] - MoreSpaceNeeded { - /// expected size - size: usize, - }, - /// An IO error occurred. #[error(transparent)] IOError(#[from] io::Error), } -/// Return type of `read_events()`. -#[derive(Debug, PartialEq, Eq)] -pub struct Events { - /// The number of events read. - pub read: usize, - /// The number of events lost. - pub lost: usize, +/// An event read from a perf event array buffer. +/// +/// Yielded by [`PerfEventArrayBuffer::try_fold`], [`PerfEventArrayBuffer::fold`], +/// and [`PerfEventArrayBuffer::for_each`]. +/// +/// [`PerfEventArrayBuffer::try_fold`]: super::PerfEventArrayBuffer::try_fold +/// [`PerfEventArrayBuffer::fold`]: super::PerfEventArrayBuffer::fold +/// [`PerfEventArrayBuffer::for_each`]: super::PerfEventArrayBuffer::for_each +#[derive(Debug)] +pub enum PerfEvent<'a> { + /// A sample emitted by `bpf_perf_event_output()`. The bytes are exposed + /// as up to two slices borrowed directly from the kernel-mapped ring + /// buffer; the second slice is empty for samples that fit contiguously, + /// and both are populated when a sample straddles the ring boundary. The + /// bytes include any kernel-side alignment padding that follows the + /// payload. + #[doc(alias = "PERF_RECORD_SAMPLE")] + Sample { + /// First chunk of the sample, or the entire sample when it does not wrap. + head: &'a [u8], + /// Second chunk; empty unless the sample straddles the ring boundary. + tail: &'a [u8], + }, + /// A signal from the kernel that samples were dropped because the ring + /// buffer was full. + #[doc(alias = "PERF_RECORD_LOST")] + Lost { + /// Number of dropped samples. + count: u64, + }, } #[cfg_attr(test, derive(Debug))] @@ -135,128 +144,189 @@ impl PerfBuffer { Ok(perf_buf) } - const fn buf(&self) -> ptr::NonNull { + const fn buf(&self) -> NonNull { self.mmap.ptr().cast() } - pub(crate) fn readable(&self) -> bool { - let header = self.buf().as_ptr(); - let head = unsafe { (*header).data_head } as usize; - let tail = unsafe { (*header).data_tail } as usize; - head != tail + /// Read `data_head`, the kernel's producer position. Pairs with the + /// kernel's `smp_wmb() + WRITE_ONCE()` publish [1]. + /// + /// [1]: https://github.com/torvalds/linux/blob/05f7e89a/kernel/events/ring_buffer.c#L113-L114 + fn data_head(&self) -> u64 { + // SAFETY: `self.buf()` points to the mmap'd `perf_event_mmap_page` for + // the lifetime of `self`, so the field projection is in-bounds and + // aligned. + let ptr = unsafe { &raw const (*self.buf().as_ptr()).data_head }; + // SAFETY: the kernel writes `data_head` concurrently; a plain read + // would be UB. + let value = unsafe { ptr::read_volatile(ptr) }; + atomic::fence(Ordering::Acquire); + value } - pub(crate) fn read_events( - &mut self, - buffers: &mut [BytesMut], - ) -> Result { - if buffers.is_empty() { - return Err(PerfBufferError::NoBuffers); - } - let header = self.buf().as_ptr(); - let base = unsafe { header.byte_add(self.page_size) }; + /// Read `data_tail`, the userspace consumer position. Userspace is the + /// sole writer, so a plain read suffices. + fn data_tail(&self) -> u64 { + // SAFETY: `self.buf()` points to the mmap'd `perf_event_mmap_page` + // for the lifetime of `self`. Only userspace writes this field, so + // there is no concurrent kernel write to race with. + unsafe { (*self.buf().as_ptr()).data_tail } + } - let mut events = Events { read: 0, lost: 0 }; - let mut buf_n = 0; + /// Mutable pointer to `data_tail`, used by [`try_fold`] to commit the + /// local position with a release-store on scope exit. + /// + /// [`try_fold`]: Self::try_fold + fn data_tail_ptr(&mut self) -> NonNull { + // SAFETY: `self.buf()` points to the mmap'd `perf_event_mmap_page` for + // the lifetime of `self`, so the field projection is in-bounds and + // aligned. + let ptr = unsafe { &raw mut (*self.buf().as_ptr()).data_tail }; + // SAFETY: `self.buf()` is `NonNull`, so a field projection within it + // is non-null. + unsafe { NonNull::new_unchecked(ptr) } + } - let fill_buf = |start_off, base: *const u8, mmap_size, out_buf: &mut [u8]| { - let len = out_buf.len(); + /// Pointer to the start of the data pages following the header page. + const fn data_pages(&self) -> *const u8 { + // SAFETY: the mmap spans `page_size + size` bytes; the data area + // starts at offset `page_size`. + unsafe { self.buf().as_ptr().byte_add(self.page_size) }.cast::() + } - let end = (start_off + len) % mmap_size; - let start = start_off % mmap_size; + pub(crate) fn readable(&self) -> bool { + self.data_head() != self.data_tail() + } - if start < end { - out_buf.copy_from_slice(unsafe { slice::from_raw_parts(base.add(start), len) }); - } else { - let size = mmap_size - start; + pub(crate) fn try_fold(&mut self, init: C, mut f: F) -> ControlFlow + where + F: FnMut(C, PerfEvent<'_>) -> ControlFlow, + { + let base = self.data_pages(); + let mmap_size = self.size; + let data_head = self.data_head(); + let initial_tail = self.data_tail(); + let data_tail_ptr = self.data_tail_ptr(); + + // Defer the userspace release-store of `data_tail` to scope exit so + // it pairs with the kernel's `READ_ONCE()` [1] in one SeqCst barrier + // instead of per event, and remains panic-safe. + // + // [1]: https://github.com/torvalds/linux/blob/05f7e89a/kernel/events/ring_buffer.c#L202 + let mut guard = scopeguard::guard(initial_tail, |tail| { + if tail != initial_tail { + atomic::fence(Ordering::SeqCst); + // SAFETY: `data_tail_ptr` was derived from a `&mut PerfBuffer` + // outliving this scope; userspace is the sole writer of + // `data_tail`. unsafe { - out_buf[..size].copy_from_slice(slice::from_raw_parts(base.add(start), size)); - out_buf[size..].copy_from_slice(slice::from_raw_parts(base, len - size)); + ptr::write_volatile(data_tail_ptr.as_ptr(), tail); } } - }; - - let read_event = |event_start, event_type, base, buf: &mut BytesMut| { - let sample_size = match event_type { - x if x == PERF_RECORD_SAMPLE as u32 || x == PERF_RECORD_LOST as u32 => { - let mut size = [0u8; size_of::()]; - fill_buf( - event_start + size_of::(), - base, - self.size, - &mut size, - ); - u32::from_ne_bytes(size) - } - _ => return Ok(None), - } as usize; - - let sample_start = - (event_start + size_of::() + size_of::()) % self.size; + }); - match event_type { + let tail = &mut *guard; + + let mut acc = init; + while *tail != data_head { + let event_start = (*tail % mmap_size as u64) as usize; + debug_assert_eq!( + event_start % 8, + 0, + "perf records are 8-byte aligned (event_start={event_start})" + ); + // SAFETY: the kernel pads each record to 8 bytes [1] and the + // buffer is page-aligned, so `base + event_start` is aligned for + // `perf_event_header` and the 8-byte header fits before the wrap + // boundary. + // + // [1]: https://github.com/torvalds/linux/blob/05f7e89a/kernel/events/core.c#L8451 + let event: perf_event_header = unsafe { ptr::read(base.add(event_start).cast()) }; + *tail = tail.wrapping_add(u64::from(event.size)); + + let perf_event = match event.type_ { x if x == PERF_RECORD_SAMPLE as u32 => { - buf.clear(); - buf.reserve(sample_size); - unsafe { buf.set_len(sample_size) } - - fill_buf(sample_start, base, self.size, buf); - - Ok(Some((1, 0))) + // The `u32` size prefix follows the header at an 8-aligned + // offset, so it never spans the wrap. + let size_offset = (event_start + size_of::()) % mmap_size; + // SAFETY: same kernel-alignment guarantee as the header read. + let sample_size: u32 = unsafe { ptr::read(base.add(size_offset).cast()) }; + let sample_size = sample_size as usize; + + // The sample payload follows the size prefix and may span + // the wrap. + let sample_start = + (event_start + size_of::() + size_of::()) + % mmap_size; + debug_assert!(sample_size <= mmap_size); + let (head, tail) = if let Some(second) = + (sample_start + sample_size).checked_sub(mmap_size) + { + let first = mmap_size - sample_start; + // SAFETY: `[sample_start, mmap_size)` and `[0, second)` + // are disjoint ranges within the mmap region; the + // kernel will not overwrite them until `data_tail` + // is committed on scope exit. + let head = unsafe { slice::from_raw_parts(base.add(sample_start), first) }; + let tail = unsafe { slice::from_raw_parts(base, second) }; + (head, tail) + } else { + // SAFETY: same as above; `sample_start + sample_size + // <= mmap_size`, contiguous within mmap. + let head = + unsafe { slice::from_raw_parts(base.add(sample_start), sample_size) }; + (head, &[][..]) + }; + PerfEvent::Sample { head, tail } } x if x == PERF_RECORD_LOST as u32 => { - let mut count = [0u8; size_of::()]; - fill_buf( - event_start + size_of::() + size_of::(), - base, - self.size, - &mut count, - ); - Ok(Some((0, u64::from_ne_bytes(count) as usize))) + // `PERF_RECORD_LOST` layout is + // `{ header, u64 id, u64 lost, sample_id }` [1]; skip past + // `id` to read the `lost` count. + // + // [1]: https://github.com/torvalds/linux/blob/05f7e89a/include/uapi/linux/perf_event.h#L906-L914 + let lost_offset = + (event_start + size_of::() + size_of::()) + % mmap_size; + // SAFETY: same kernel-alignment guarantee as the header read; the + // fixed 8-byte field at an 8-aligned offset cannot span the wrap. + let count: u64 = unsafe { ptr::read(base.add(lost_offset).cast()) }; + PerfEvent::Lost { count } } - _ => Ok(None), - } - }; + event_type => { + // `PerfBuffer::open` configures `SoftwareEvent::BpfOutput` + // with no side-band attr flags [1]; the kernel only emits + // SAMPLE and LOST. + // + // [1]: https://github.com/torvalds/linux/blob/05f7e89a/kernel/events/core.c#L5182-L5200 + debug_assert!(false, "unexpected perf record type: {event_type}"); + continue; + } + }; - let head = unsafe { (*header).data_head } as usize; - let mut tail = unsafe { (*header).data_tail } as usize; - let result = loop { - if head == tail { - break Ok(()); + match f(acc, perf_event) { + ControlFlow::Continue(next) => acc = next, + ControlFlow::Break(v) => return ControlFlow::Break(v), } - if buf_n == buffers.len() { - break Ok(()); - } - - let buf = &mut buffers[buf_n]; - - let event_start = tail % self.size; - let event: perf_event_header = - unsafe { ptr::read_unaligned(base.byte_add(event_start).cast()) }; - let event_size = event.size as usize; + } - match read_event(event_start, event.type_, base.cast(), buf) { - Ok(Some((read, lost))) => { - if read > 0 { - buf_n += 1; - events.read += read; - } - events.lost += lost; - } - Ok(None) => { /* skip unknown event type */ } - Err(e) => { - // we got an error and we didn't process any events, propagate the error - // and give the caller a chance to increase buffers - break Err(e); - } - } - tail += event_size; - }; + ControlFlow::Continue(acc) + } - atomic::fence(Ordering::SeqCst); - unsafe { (*header).data_tail = tail as u64 } + pub(crate) fn fold(&mut self, init: C, mut f: F) -> C + where + F: FnMut(C, PerfEvent<'_>) -> C, + { + let ControlFlow::Continue(acc) = self + .try_fold::(init, |acc, event| ControlFlow::Continue(f(acc, event))); + acc + } - result.map(|()| events) + pub(crate) fn for_each(&mut self, mut f: F) + where + F: FnMut(PerfEvent<'_>), + { + self.fold((), |(), event| f(event)) } } @@ -278,6 +348,7 @@ mod tests { use std::fmt::Debug; use assert_matches::assert_matches; + use test_case::test_case; use super::*; use crate::sys::{Syscall, TEST_MMAP_RET, override_syscall}; @@ -322,34 +393,6 @@ mod tests { ); } - #[test] - fn test_no_out_bufs() { - let mut mmapped_buf = MMappedBuf { - data: [0; PAGE_SIZE * 2], - }; - - fake_mmap(&mut mmapped_buf); - let mut buf = PerfBuffer::open(1, PAGE_SIZE, 1).unwrap(); - - assert_matches!(buf.read_events(&mut []), Err(PerfBufferError::NoBuffers)) - } - - #[test] - fn test_no_events() { - let mut mmapped_buf = MMappedBuf { - data: [0; PAGE_SIZE * 2], - }; - - fake_mmap(&mut mmapped_buf); - let mut buf = PerfBuffer::open(1, PAGE_SIZE, 1).unwrap(); - - let out_buf = BytesMut::with_capacity(4); - assert_eq!( - buf.read_events(&mut [out_buf]).unwrap(), - Events { read: 0, lost: 0 } - ); - } - fn write(mmapped_buf: &mut MMappedBuf, offset: usize, value: T) -> usize { let dst: *mut _ = mmapped_buf; let head = offset + size_of::(); @@ -360,45 +403,9 @@ mod tests { head } - #[test] - fn test_read_first_lost() { - #[repr(C)] - #[derive(Debug)] - struct LostSamples { - header: perf_event_header, - id: u64, - count: u64, - } - - let mut mmapped_buf = MMappedBuf { - data: [0; PAGE_SIZE * 2], - }; - - write( - &mut mmapped_buf, - 0, - LostSamples { - header: perf_event_header { - type_: PERF_RECORD_LOST as u32, - misc: 0, - size: size_of::() as u16, - }, - id: 1, - count: 0xCAFEBABE, - }, - ); - - fake_mmap(&mut mmapped_buf); - let mut buf = PerfBuffer::open(1, PAGE_SIZE, 1).unwrap(); - - let out_buf = BytesMut::with_capacity(0); - let events = buf.read_events(&mut [out_buf]).unwrap(); - assert_eq!(events.lost, 0xCAFEBABE); - } - #[repr(C)] #[derive(Debug)] - struct PerfSample { + struct TestPerfRecord { s_hdr: Sample, value: T, } @@ -407,12 +414,12 @@ mod tests { write( mmapped_buf, offset, - PerfSample { + TestPerfRecord { s_hdr: Sample { header: perf_event_header { type_: PERF_RECORD_SAMPLE as u32, misc: 0, - size: size_of::>() as u16, + size: size_of::>() as u16, }, size: size_of::() as u32, }, @@ -425,166 +432,127 @@ mod tests { u32::from_ne_bytes(buf[..4].try_into().unwrap()) } - fn u64_from_buf(buf: &[u8]) -> u64 { - u64::from_ne_bytes(buf[..8].try_into().unwrap()) - } - - #[test] - fn test_read_first_sample() { - let mut mmapped_buf = MMappedBuf { - data: [0; PAGE_SIZE * 2], - }; - - write_sample(&mut mmapped_buf, 0, 0xCAFEBABEu32); - - fake_mmap(&mut mmapped_buf); - let mut buf = PerfBuffer::open(1, PAGE_SIZE, 1).unwrap(); - - let mut out_bufs = [BytesMut::with_capacity(4)]; - - let events = buf.read_events(&mut out_bufs).unwrap(); - assert_eq!(events, Events { lost: 0, read: 1 }); - assert_eq!(u32_from_buf(&out_bufs[0]), 0xCAFEBABE); - } - - #[test] - fn test_read_many_with_many_reads() { - let mut mmapped_buf = MMappedBuf { - data: [0; PAGE_SIZE * 2], - }; - - let next = write_sample(&mut mmapped_buf, 0, 0xCAFEBABEu32); - write_sample(&mut mmapped_buf, next, 0xBADCAFEu32); - - fake_mmap(&mut mmapped_buf); - let mut buf = PerfBuffer::open(1, PAGE_SIZE, 1).unwrap(); - - let mut out_bufs = [BytesMut::with_capacity(4)]; - - let events = buf.read_events(&mut out_bufs).unwrap(); - assert_eq!(events, Events { lost: 0, read: 1 }); - assert_eq!(u32_from_buf(&out_bufs[0]), 0xCAFEBABE); - - let events = buf.read_events(&mut out_bufs).unwrap(); - assert_eq!(events, Events { lost: 0, read: 1 }); - assert_eq!(u32_from_buf(&out_bufs[0]), 0xBADCAFE); + fn sample_bytes(head: &[u8], tail: &[u8]) -> Vec { + let mut v = Vec::with_capacity(head.len() + tail.len()); + v.extend_from_slice(head); + v.extend_from_slice(tail); + v } - #[test] - fn test_read_many_with_one_read() { + #[test_case(&[] ; "empty")] + #[test_case(&[0xCAFEBABEu32] ; "single")] + #[test_case(&[0xCAFEBABEu32, 0xBADCAFEu32] ; "consecutive")] + fn test_for_each_samples(expected: &[u32]) { let mut mmapped_buf = MMappedBuf { data: [0; PAGE_SIZE * 2], }; - let next = write_sample(&mut mmapped_buf, 0, 0xCAFEBABEu32); - write_sample(&mut mmapped_buf, next, 0xBADCAFEu32); + let mut offset = 0; + for &v in expected { + offset = write_sample(&mut mmapped_buf, offset, v); + } fake_mmap(&mut mmapped_buf); let mut buf = PerfBuffer::open(1, PAGE_SIZE, 1).unwrap(); - let mut out_bufs = std::iter::repeat_n(BytesMut::with_capacity(4), 3).collect::>(); - - let events = buf.read_events(&mut out_bufs).unwrap(); - assert_eq!(events, Events { lost: 0, read: 2 }); - assert_eq!(u32_from_buf(&out_bufs[0]), 0xCAFEBABE); - assert_eq!(u32_from_buf(&out_bufs[1]), 0xBADCAFE); + let mut payloads = Vec::new(); + buf.for_each(|event| match event { + PerfEvent::Sample { head, tail } => { + payloads.push(u32_from_buf(&sample_bytes(head, tail))); + } + PerfEvent::Lost { count } => panic!("unexpected lost: {count}"), + }); + assert_eq!(payloads, expected); } #[test] - fn test_read_last_sample() { - let mut mmapped_buf = MMappedBuf { - data: [0; PAGE_SIZE * 2], - }; - - let offset = PAGE_SIZE - size_of::>(); - write_sample(&mut mmapped_buf, offset, 0xCAFEBABEu32); - mmapped_buf.mmap_page.data_tail = offset as u64; - - fake_mmap(&mut mmapped_buf); - let mut buf = PerfBuffer::open(1, PAGE_SIZE, 1).unwrap(); - - let mut out_bufs = [BytesMut::with_capacity(4)]; - - let events = buf.read_events(&mut out_bufs).unwrap(); - assert_eq!(events, Events { lost: 0, read: 1 }); - assert_eq!(u32_from_buf(&out_bufs[0]), 0xCAFEBABE); - } + fn test_for_each_lost() { + #[repr(C)] + #[derive(Debug)] + struct LostSamples { + header: perf_event_header, + id: u64, + count: u64, + } - #[test] - fn test_read_wrapping_sample_size() { let mut mmapped_buf = MMappedBuf { data: [0; PAGE_SIZE * 2], }; - let offset = PAGE_SIZE - size_of::() - 2; write( &mut mmapped_buf, - offset, - perf_event_header { - type_: PERF_RECORD_SAMPLE as u32, - misc: 0, - size: size_of::>() as u16, + 0, + LostSamples { + header: perf_event_header { + type_: PERF_RECORD_LOST as u32, + misc: 0, + size: size_of::() as u16, + }, + id: 1, + count: 0xCAFEBABE, }, ); - mmapped_buf.mmap_page.data_tail = offset as u64; - - let (left, right) = if cfg!(target_endian = "little") { - (0x0004u16, 0x0000u16) - } else { - (0x0000u16, 0x0004u16) - }; - write(&mut mmapped_buf, PAGE_SIZE - 2, left); - write(&mut mmapped_buf, 0, right); - write(&mut mmapped_buf, 2, 0xBAADCAFEu32); fake_mmap(&mut mmapped_buf); let mut buf = PerfBuffer::open(1, PAGE_SIZE, 1).unwrap(); - let mut out_bufs = [BytesMut::with_capacity(8)]; - - let events = buf.read_events(&mut out_bufs).unwrap(); - assert_eq!(events, Events { lost: 0, read: 1 }); - assert_eq!(u32_from_buf(&out_bufs[0]), 0xBAADCAFE); + let mut events = Vec::new(); + buf.for_each(|event| match event { + PerfEvent::Sample { .. } => panic!("unexpected sample"), + PerfEvent::Lost { count } => events.push(count), + }); + assert_eq!(events, [0xCAFEBABE]); } - #[test] - fn test_read_wrapping_value() { - let mut mmapped_buf = MMappedBuf { - data: [0; PAGE_SIZE * 2], - }; - + // The `write` helper sets `data_head` to the post-write absolute byte + // offset, which is convenient for non-wrapping records but does not match + // the kernel-side semantics of monotonic `data_head`/`data_tail` counters. + // Wrap fixtures fix `data_head` up afterwards to match the real kernel + // contract (tail + total_event_size). + fn fixture_wrap_data(mmapped_buf: &mut MMappedBuf) -> &'static [u8] { let (left, right) = if cfg!(target_endian = "little") { (0xCAFEBABEu32, 0xBAADCAFEu32) } else { (0xBAADCAFEu32, 0xCAFEBABEu32) }; - - let offset = PAGE_SIZE - size_of::>(); + let offset = PAGE_SIZE - size_of::>(); write( - &mut mmapped_buf, + mmapped_buf, offset, - PerfSample { + TestPerfRecord { s_hdr: Sample { header: perf_event_header { type_: PERF_RECORD_SAMPLE as u32, misc: 0, - size: size_of::>() as u16, + size: size_of::>() as u16, }, size: size_of::() as u32, }, value: left, }, ); - write(&mut mmapped_buf, 0, right); + write(mmapped_buf, 0, right); mmapped_buf.mmap_page.data_tail = offset as u64; + mmapped_buf.mmap_page.data_head = (offset + size_of::>()) as u64; + static EXPECTED: [u8; 8] = 0xBAADCAFECAFEBABEu64.to_ne_bytes(); + &EXPECTED + } + + #[test] + fn test_for_each_wrap() { + let mut mmapped_buf = MMappedBuf { + data: [0; PAGE_SIZE * 2], + }; + let expected = fixture_wrap_data(&mut mmapped_buf); fake_mmap(&mut mmapped_buf); let mut buf = PerfBuffer::open(1, PAGE_SIZE, 1).unwrap(); - let mut out_bufs = [BytesMut::with_capacity(8)]; - - let events = buf.read_events(&mut out_bufs).unwrap(); - assert_eq!(events, Events { lost: 0, read: 1 }); - assert_eq!(u64_from_buf(&out_bufs[0]), 0xBAADCAFECAFEBABE); + let mut got: Vec = Vec::new(); + buf.for_each(|event| match event { + PerfEvent::Sample { head, tail } => got.extend(sample_bytes(head, tail)), + PerfEvent::Lost { count } => panic!("unexpected lost: {count}"), + }); + assert_eq!(&got[..expected.len()], expected); } } diff --git a/aya/src/maps/perf/perf_event_array.rs b/aya/src/maps/perf/perf_event_array.rs index d992f3d96..03dc483a3 100644 --- a/aya/src/maps/perf/perf_event_array.rs +++ b/aya/src/maps/perf/perf_event_array.rs @@ -4,18 +4,16 @@ use std::{ borrow::{Borrow, BorrowMut}, - ops::Deref as _, + ops::{ControlFlow, Deref as _}, os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd}, path::Path, sync::Arc, }; -use bytes::BytesMut; - use crate::{ maps::{ MapData, MapError, PinError, - perf::{Events, PerfBuffer, PerfBufferError}, + perf::{PerfBuffer, PerfBufferError, PerfEvent}, }, sys::bpf_map_update_elem, util::page_size, @@ -39,21 +37,45 @@ impl> PerfEventArrayBuffer { self.buf.readable() } - /// Reads events from the buffer. + /// Processes events available in the buffer with `f`. /// - /// This method reads events into the provided slice of buffers, filling - /// each buffer in order stopping when there are no more events to read or - /// all the buffers have been filled. + /// For each available event, `f` receives the accumulator and event: + /// * [`ControlFlow::Continue(next)`](ControlFlow::Continue) keeps draining with `next`. + /// * [`ControlFlow::Break(break_value)`](ControlFlow::Break) stops early and returns `break_value`. /// - /// Returns the number of events read and the number of events lost. Events - /// are lost when user space doesn't read events fast enough and the ring - /// buffer fills up. + /// If the buffer is fully drained, returns [`ControlFlow::Continue`] + /// containing the final accumulator. /// - /// # Errors + /// The slices in [`PerfEvent::Sample`] are borrowed directly from the perf + /// ring buffer; the borrow is bounded by the closure invocation. The + /// kernel-visible `data_tail` is advanced once at the end of the call, + /// amortizing the `SeqCst` store across the drain. + pub fn try_fold(&mut self, init: C, f: F) -> ControlFlow + where + F: FnMut(C, PerfEvent<'_>) -> ControlFlow, + { + self.buf.try_fold(init, f) + } + + /// Processes events available in the buffer with `f`. /// - /// [`PerfBufferError::NoBuffers`] is returned when `out_bufs` is empty. - pub fn read_events(&mut self, out_bufs: &mut [BytesMut]) -> Result { - self.buf.read_events(out_bufs) + /// For each available event, `f` receives the accumulator and event, and + /// returns the next accumulator. Unlike [`PerfEventArrayBuffer::try_fold`], + /// this function cannot short-circuit: it always processes events until the + /// buffer is fully drained, then returns the final accumulator. + pub fn fold(&mut self, init: C, f: F) -> C + where + F: FnMut(C, PerfEvent<'_>) -> C, + { + self.buf.fold(init, f) + } + + /// Processes events available in the buffer with `f`. + pub fn for_each(&mut self, f: F) + where + F: FnMut(PerfEvent<'_>), + { + self.buf.for_each(f) } } @@ -78,7 +100,10 @@ impl> AsRawFd for PerfEventArrayBuffer { /// * call [`PerfEventArray::open`] /// * poll the returned [`PerfEventArrayBuffer`] to be notified when events are /// inserted in the buffer -/// * call [`PerfEventArrayBuffer::read_events`] to read the events +/// * drain events with [`PerfEventArrayBuffer::for_each`] (or [`fold`]/[`try_fold`]) +/// +/// [`fold`]: PerfEventArrayBuffer::fold +/// [`try_fold`]: PerfEventArrayBuffer::try_fold /// /// # Minimum kernel version /// @@ -90,7 +115,7 @@ impl> AsRawFd for PerfEventArrayBuffer { /// available CPU: /// /// ```no_run -/// # use aya::maps::perf::PerfEventArrayBuffer; +/// # use aya::maps::perf::{PerfEvent, PerfEventArrayBuffer}; /// # use aya::maps::MapData; /// # use std::borrow::BorrowMut; /// # struct Poll { _t: std::marker::PhantomData }; @@ -116,7 +141,6 @@ impl> AsRawFd for PerfEventArrayBuffer { /// # let mut bpf = aya::Ebpf::load(&[])?; /// use aya::maps::PerfEventArray; /// use aya::util::online_cpus; -/// use bytes::BytesMut; /// /// let mut perf_array = PerfEventArray::try_from(bpf.map_mut("EVENTS").unwrap())?; /// @@ -128,14 +152,18 @@ impl> AsRawFd for PerfEventArrayBuffer { /// perf_buffers.push(perf_array.open(cpu_id, None)?); /// } /// -/// let mut out_bufs = [BytesMut::with_capacity(1024)]; -/// /// // poll the buffers to know when they have queued events /// let poll = poll_buffers(perf_buffers); /// loop { -/// for read_buf in poll.poll_readable() { -/// read_buf.read_events(&mut out_bufs)?; -/// // process out_bufs +/// for perf_buf in poll.poll_readable() { +/// perf_buf.for_each(|event| match event { +/// PerfEvent::Sample { head, tail } => { +/// // process the sample bytes (`tail` is empty unless the sample wraps) +/// } +/// PerfEvent::Lost { count } => { +/// // record the dropped-events counter +/// } +/// }); /// } /// } /// diff --git a/test/integration-test/Cargo.toml b/test/integration-test/Cargo.toml index 241bf1ae3..81e5fec45 100644 --- a/test/integration-test/Cargo.toml +++ b/test/integration-test/Cargo.toml @@ -21,7 +21,6 @@ anyhow = { workspace = true, features = ["std"] } assert_matches = { workspace = true } aya-log = { path = "../../aya-log", version = "^0.2.2", default-features = false } aya-obj = { path = "../../aya-obj", version = "^0.2.2" } -bytes = { workspace = true } epoll = { workspace = true } futures = { workspace = true, features = ["alloc"] } integration-common = { path = "../integration-common", features = ["user"] } diff --git a/test/integration-test/src/tests/perf_event_array.rs b/test/integration-test/src/tests/perf_event_array.rs index 4c3affc04..4edfdbcb7 100644 --- a/test/integration-test/src/tests/perf_event_array.rs +++ b/test/integration-test/src/tests/perf_event_array.rs @@ -1,10 +1,9 @@ use aya::{ EbpfLoader, - maps::{PerfEventArray, perf::Events}, + maps::{PerfEventArray, perf::PerfEvent}, programs::{UProbe, uprobe::UProbeScope}, util::online_cpus, }; -use bytes::BytesMut; use test_case::test_case; #[unsafe(no_mangle)] @@ -48,29 +47,16 @@ fn emit_event(bpf_obj: &[u8], events_map: &str, prog: &str) { trigger_emit_event(); - // PERF_SAMPLE_RAW is encoded as `u32 size + data`; the kernel rounds the - // field's total to 8 bytes. `read_events` copies `size` bytes (payload + pad). - const PADDED_SAMPLE_SIZE: usize = { - let size_field = size_of::(); - (size_field + size_of::()).next_multiple_of(8) - size_field - }; - let mut payloads = Vec::new(); for buf in &mut buffers { - let mut out = [BytesMut::from(&[0xAAu8; PADDED_SAMPLE_SIZE * 2][..])]; - let tail = out[0].split_off(PADDED_SAMPLE_SIZE); - let Events { read, lost } = buf.read_events(&mut out).unwrap(); - assert_eq!(lost, 0); - assert!( - tail.iter().all(|&b| b == 0xAA), - "bytes beyond payload were overwritten", - ); - for sample in &out[..read] { - assert_eq!(sample.len(), PADDED_SAMPLE_SIZE); - payloads.push(u64::from_ne_bytes( - sample[..size_of::()].try_into().unwrap(), - )); - } + buf.for_each(|event| match event { + PerfEvent::Sample { head, .. } => { + payloads.push(u64::from_ne_bytes( + head[..size_of::()].try_into().unwrap(), + )); + } + PerfEvent::Lost { count } => panic!("kernel dropped {count} samples"), + }); } assert_eq!(payloads, [0xDEAD_BEEFu64]); } diff --git a/xtask/public-api/aya.txt b/xtask/public-api/aya.txt index d6acf99a7..eeeae67c8 100644 --- a/xtask/public-api/aya.txt +++ b/xtask/public-api/aya.txt @@ -243,9 +243,6 @@ pub aya::maps::perf::PerfBufferError::InvalidPageCount pub aya::maps::perf::PerfBufferError::InvalidPageCount::page_count: usize pub aya::maps::perf::PerfBufferError::MMapError pub aya::maps::perf::PerfBufferError::MMapError::io_error: std::io::error::Error -pub aya::maps::perf::PerfBufferError::MoreSpaceNeeded -pub aya::maps::perf::PerfBufferError::MoreSpaceNeeded::size: usize -pub aya::maps::perf::PerfBufferError::NoBuffers pub aya::maps::perf::PerfBufferError::OpenError pub aya::maps::perf::PerfBufferError::OpenError::io_error: std::io::error::Error pub aya::maps::perf::PerfBufferError::PerfEventEnableError @@ -265,22 +262,21 @@ impl core::marker::Unpin for aya::maps::perf::PerfBufferError impl core::marker::UnsafeUnpin for aya::maps::perf::PerfBufferError impl !core::panic::unwind_safe::RefUnwindSafe for aya::maps::perf::PerfBufferError impl !core::panic::unwind_safe::UnwindSafe for aya::maps::perf::PerfBufferError -pub struct aya::maps::perf::Events -pub aya::maps::perf::Events::lost: usize -pub aya::maps::perf::Events::read: usize -impl core::cmp::Eq for aya::maps::perf::Events -impl core::cmp::PartialEq for aya::maps::perf::Events -pub fn aya::maps::perf::Events::eq(&self, other: &aya::maps::perf::Events) -> bool -impl core::fmt::Debug for aya::maps::perf::Events -pub fn aya::maps::perf::Events::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result -impl core::marker::StructuralPartialEq for aya::maps::perf::Events -impl core::marker::Freeze for aya::maps::perf::Events -impl core::marker::Send for aya::maps::perf::Events -impl core::marker::Sync for aya::maps::perf::Events -impl core::marker::Unpin for aya::maps::perf::Events -impl core::marker::UnsafeUnpin for aya::maps::perf::Events -impl core::panic::unwind_safe::RefUnwindSafe for aya::maps::perf::Events -impl core::panic::unwind_safe::UnwindSafe for aya::maps::perf::Events +pub enum aya::maps::perf::PerfEvent<'a> +pub aya::maps::perf::PerfEvent::Lost +pub aya::maps::perf::PerfEvent::Lost::count: u64 +pub aya::maps::perf::PerfEvent::Sample +pub aya::maps::perf::PerfEvent::Sample::head: &'a [u8] +pub aya::maps::perf::PerfEvent::Sample::tail: &'a [u8] +impl<'a> core::fmt::Debug for aya::maps::perf::PerfEvent<'a> +pub fn aya::maps::perf::PerfEvent<'a>::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl<'a> core::marker::Freeze for aya::maps::perf::PerfEvent<'a> +impl<'a> core::marker::Send for aya::maps::perf::PerfEvent<'a> +impl<'a> core::marker::Sync for aya::maps::perf::PerfEvent<'a> +impl<'a> core::marker::Unpin for aya::maps::perf::PerfEvent<'a> +impl<'a> core::marker::UnsafeUnpin for aya::maps::perf::PerfEvent<'a> +impl<'a> core::panic::unwind_safe::RefUnwindSafe for aya::maps::perf::PerfEvent<'a> +impl<'a> core::panic::unwind_safe::UnwindSafe for aya::maps::perf::PerfEvent<'a> pub struct aya::maps::perf::PerfEventArray impl> aya::maps::perf::PerfEventArray pub fn aya::maps::perf::PerfEventArray::pin>(&self, path: P) -> core::result::Result<(), aya::pin::PinError> @@ -304,8 +300,10 @@ impl core::panic::unwind_safe::RefUnwindSafe for aya::maps::perf::PerfEventAr impl core::panic::unwind_safe::UnwindSafe for aya::maps::perf::PerfEventArray where T: core::panic::unwind_safe::RefUnwindSafe pub struct aya::maps::perf::PerfEventArrayBuffer impl> aya::maps::perf::PerfEventArrayBuffer -pub fn aya::maps::perf::PerfEventArrayBuffer::read_events(&mut self, out_bufs: &mut [bytes::bytes_mut::BytesMut]) -> core::result::Result +pub fn aya::maps::perf::PerfEventArrayBuffer::fold(&mut self, init: C, f: F) -> C where F: core::ops::function::FnMut(C, aya::maps::perf::PerfEvent<'_>) -> C +pub fn aya::maps::perf::PerfEventArrayBuffer::for_each(&mut self, f: F) where F: core::ops::function::FnMut(aya::maps::perf::PerfEvent<'_>) pub fn aya::maps::perf::PerfEventArrayBuffer::readable(&self) -> bool +pub fn aya::maps::perf::PerfEventArrayBuffer::try_fold(&mut self, init: C, f: F) -> core::ops::control_flow::ControlFlow where F: core::ops::function::FnMut(C, aya::maps::perf::PerfEvent<'_>) -> core::ops::control_flow::ControlFlow impl> std::os::fd::owned::AsFd for aya::maps::perf::PerfEventArrayBuffer pub fn aya::maps::perf::PerfEventArrayBuffer::as_fd(&self) -> std::os::fd::owned::BorrowedFd<'_> impl> std::os::fd::raw::AsRawFd for aya::maps::perf::PerfEventArrayBuffer