Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions aya-log/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,7 @@ impl<T: Log> EbpfLogger<T> {
/// Reads log records from eBPF and writes them to the logger.
pub fn flush(&mut self) {
let Self { ring_buf, logger } = self;
while let Some(buf) = ring_buf.next() {
log_buf(buf.as_ref(), logger).unwrap();
}
ring_buf.for_each(|buf| log_buf(buf, logger).unwrap());
}
}

Expand Down
1 change: 1 addition & 0 deletions aya/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ libc = { workspace = true }
log = { workspace = true }
object = { workspace = true, features = ["elf", "read_core", "std", "write"] }
once_cell = { workspace = true }
scopeguard = { workspace = true }
thiserror = { workspace = true }

[dev-dependencies]
Expand Down
291 changes: 215 additions & 76 deletions aya/src/maps/ring_buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

use std::{
borrow::Borrow,
convert::Infallible,
fmt::{self, Debug, Formatter},
ops::Deref,
ops::{ControlFlow, Deref},
os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd},
sync::atomic::{AtomicU32, AtomicUsize, Ordering},
};
Expand Down Expand Up @@ -37,7 +38,9 @@ use crate::{
///
/// To receive events you need to:
/// * Construct [`RingBuf`] using [`RingBuf::try_from`].
/// * Call [`RingBuf::next`] to poll events from the [`RingBuf`].
/// * Call [`RingBuf::next`] to read one entry at a time, or
/// [`RingBuf::try_fold`], [`RingBuf::fold`], or [`RingBuf::for_each`] to
/// process available entries in bulk.
///
/// To receive async notifications of data availability, you may construct an
/// [`tokio::io::unix::AsyncFd`] from the [`RingBuf`]'s file descriptor and poll it for readiness.
Expand Down Expand Up @@ -71,9 +74,9 @@ use crate::{
/// loop {
/// let mut guard = poll.readable();
/// let ring_buf = guard.inner_mut();
/// while let Some(item) = ring_buf.next() {
/// ring_buf.for_each(|item| {
/// println!("received: {:?}", item);
/// }
/// });
/// guard.clear_ready();
/// }
/// # Ok::<(), aya::EbpfError>(())
Expand Down Expand Up @@ -128,10 +131,60 @@ impl<T> RingBuf<T> {
)]
pub fn next(&mut self) -> Option<RingBufItem<'_>> {
let Self {
consumer, producer, ..
map: _,
consumer,
producer,
} = self;
producer.next(consumer)
}

/// Processes entries in the ring buffer with `f`.
///
/// For each available data entry, `f` receives the accumulator and
/// entry:
/// * [`ControlFlow::Continue(next)`](ControlFlow::Continue) keeps draining with `next`.
/// * [`ControlFlow::Break(break_value)`](ControlFlow::Break) stops early and returns `break_value`.
///
/// If the ring buffer is fully drained, returns [`ControlFlow::Continue`]
/// containing the final accumulator.
pub fn try_fold<B, C, F>(&mut self, init: C, f: F) -> ControlFlow<B, C>
where
F: FnMut(C, &[u8]) -> ControlFlow<B, C>,
{
let Self {
map: _,
consumer,
producer,
} = self;
producer.try_fold(consumer, init, f)
}

/// Processes entries in the ring buffer with `f`.
///
/// For each available data entry, `f` receives the accumulator and entry,
/// and returns the next accumulator.
///
/// Unlike [`RingBuf::try_fold`], this function cannot short-circuit: it
/// always processes entries until the ring buffer is fully drained, then
/// returns the final accumulator.
pub fn fold<C, F>(&mut self, init: C, mut f: F) -> C
where
F: FnMut(C, &[u8]) -> C,
{
let ControlFlow::Continue(acc) = self
.try_fold::<Infallible, _, _>(init, |acc, data| ControlFlow::Continue(f(acc, data)));
acc
}

/// Processes entries in the ring buffer with `f`.
///
/// For each available data entry, `f` receives the entry.
pub fn for_each<F>(&mut self, mut f: F)
where
F: FnMut(&[u8]),
{
self.fold((), |(), data| f(data))
}
}

impl<T: Borrow<MapData>> AsFd for RingBuf<T> {
Expand All @@ -151,7 +204,7 @@ impl<T: Borrow<MapData>> AsRawFd for RingBuf<T> {
}
}

/// The current outstanding item read from the ringbuf.
/// An item read from the ring buffer.
pub struct RingBufItem<'a> {
data: &'a [u8],
consumer: &'a mut ConsumerPos,
Expand All @@ -161,15 +214,16 @@ impl Deref for RingBufItem<'_> {
type Target = [u8];

fn deref(&self) -> &Self::Target {
let Self { data, .. } = self;
let Self { data, consumer: _ } = self;
data
}
}

impl Drop for RingBufItem<'_> {
fn drop(&mut self) {
let Self { consumer, data } = self;
consumer.consume(data.len())
let Self { data, consumer } = self;
consumer.consume(data.len());
consumer.commit();
}
}

Expand Down Expand Up @@ -228,10 +282,13 @@ impl ConsumerPos {
}

fn consume(&mut self, len: usize) {
let Self { pos, metadata } = self;
let Self { pos, metadata: _ } = self;

*pos += (usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap() + len).next_multiple_of(8);
}

fn commit(&self) {
let Self { pos, metadata } = self;
// Write operation needs to be properly ordered with respect to the producer committing new
// data to the ringbuf. The producer uses xchg (SeqCst) to commit new data [1]. The producer
// reads the consumer offset after clearing the busy bit on a new entry [2]. By using SeqCst
Expand Down Expand Up @@ -321,89 +378,171 @@ impl ProducerData {
pos_cache,
mask,
} = self;
let mmap = &*mmap;
let data_pages = Self::data_pages(mmap, *data_offset);
loop {
if !Self::data_available(mmap, pos_cache, consumer, |_consumer| {}) {
return None;
}
match consumer.read_item(data_pages, *mask) {
Item::Busy => return None,
Item::Discard { len } => {
consumer.consume(len);
consumer.commit();
}
Item::Data(data) => {
return Some(RingBufItem { data, consumer });
}
}
}
}

fn data_pages(mmap: &MMap, data_offset: usize) -> &[u8] {
let mmap_data = mmap.as_ref();
#[expect(
clippy::panic,
reason = "invalid ring buffer layout is a fatal internal error"
)]
let data_pages = mmap_data.get(*data_offset..).unwrap_or_else(|| {
mmap_data.get(data_offset..).unwrap_or_else(|| {
panic!(
"offset {} out of bounds, data len {}",
data_offset,
mmap_data.len()
)
});
while data_available(mmap, pos_cache, consumer) {
match read_item(data_pages, *mask, consumer) {
Item::Busy => return None,
Item::Discard { len } => consumer.consume(len),
Item::Data(data) => return Some(RingBufItem { data, consumer }),
}
}
return None;
})
}

enum Item<'a> {
Busy,
Discard { len: usize },
Data(&'a [u8]),
fn data_available<F>(
producer: &MMap,
producer_cache: &mut usize,
consumer: &mut ConsumerPos,
mut flush_consumer: F,
) -> bool
where
F: FnMut(&mut ConsumerPos),
{
let ConsumerPos { pos, metadata: _ } = consumer;
// Refresh the producer position cache if it appears that the consumer is caught up
// with the producer position.
if pos == producer_cache {
// Persist the consumer position to avoid starving the producer.
flush_consumer(consumer);
*producer_cache = load_producer_pos(producer);
}

fn data_available(
producer: &MMap,
producer_cache: &mut usize,
consumer: &ConsumerPos,
) -> bool {
let ConsumerPos { pos: consumer, .. } = consumer;
// Refresh the producer position cache if it appears that the consumer is caught up
// with the producer position.
if consumer == producer_cache {
*producer_cache = load_producer_pos(producer);
}
// Note that we don't compare the order of the values because the producer position may
// overflow u32 and wrap around to 0. Instead we just compare equality and assume that
// the consumer position is always logically less than the producer position.
//
// Note also that the kernel, at the time of writing [1], doesn't seem to handle this
// overflow correctly at all, and it's not clear that one can produce events after the
// producer position has wrapped around.
//
// [1]: https://github.com/torvalds/linux/blob/4b810bf0/kernel/bpf/ringbuf.c#L434-L440
let ConsumerPos { pos, metadata: _ } = consumer;
pos != producer_cache
}

// Note that we don't compare the order of the values because the producer position may
// overflow u32 and wrap around to 0. Instead we just compare equality and assume that
// the consumer position is always logically less than the producer position.
//
// Note also that the kernel, at the time of writing [1], doesn't seem to handle this
// overflow correctly at all, and it's not clear that one can produce events after the
// producer position has wrapped around.
//
// [1]: https://github.com/torvalds/linux/blob/4b810bf0/kernel/bpf/ringbuf.c#L434-L440
consumer != producer_cache
fn try_fold<B, C, F>(
&mut self,
consumer: &mut ConsumerPos,
init: C,
mut f: F,
) -> ControlFlow<B, C>
where
F: FnMut(C, &[u8]) -> ControlFlow<B, C>,
{
let Self {
mmap,
data_offset,
pos_cache,
mask,
} = self;
let data_pages = Self::data_pages(mmap, *data_offset);
let mut acc = init;
let mut advanced = false;
let consume = |consumer: &mut ConsumerPos, advanced: &mut bool, len: usize| {
consumer.consume(len);
*advanced = true;
};
let flush = |consumer: &mut ConsumerPos, advanced: &mut bool| -> bool {
let flush = std::mem::replace(advanced, false);
if flush {
consumer.commit();
}
flush
};
// This must be deferred in case `f` panics.
let mut guard = scopeguard::guard((consumer, &mut advanced), |(consumer, advanced)| {
flush(consumer, advanced);
});
loop {
let (consumer, advanced) = &mut *guard;
if !Self::data_available(mmap, pos_cache, consumer, |consumer| {
flush(consumer, advanced);
}) {
break;
}
match consumer.read_item(data_pages, *mask) {
Item::Busy => {
if !flush(consumer, advanced) {
break;
}
}
Item::Discard { len } => consume(consumer, advanced, len),
Item::Data(data) => {
// This must be deferred in case `f` panics.
scopeguard::defer! { consume(consumer, advanced, data.len()) };
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If f panics do you definitely want to consume the item? I think it's a reasonable contract for the use case because panicking in a loop is bad, but it is worth documenting and I could see it being pushed into f if needed (like the caller of this function in some way tracks if it panicked)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a reasonable thing to call out. It also suggests that you might want control over consumption - you can imagine a combinator like TakeWhile that needs to peek and thus may elect to halt iteration without consuming the last item.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the take_while docs explicitly mention this; the rejected element is itself consumed.

https://doc.rust-lang.org/stable/std/iter/trait.Iterator.html#method.take_while

Still, this is a unlike a normal iterator because it's yielding borrowed data. Do you think it would be useful to allow the caller to explicitly (or implicitly, e.g. via panic) decide to not consume data?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not worth doing anything about. If some consumer is so worried about panicking they can copy the data out or something like that.

match f(acc, data) {
ControlFlow::Continue(next) => {
acc = next;
}
ControlFlow::Break(v) => {
return ControlFlow::Break(v);
}
}
}
}
}
ControlFlow::Continue(acc)
}
}

enum Item<'a> {
Busy,
Discard { len: usize },
Data(&'a [u8]),
}

fn read_item<'data>(data: &'data [u8], mask: u32, pos: &ConsumerPos) -> Item<'data> {
let ConsumerPos { pos, .. } = pos;
let offset = pos & usize::try_from(mask).unwrap();
#[expect(
clippy::panic,
reason = "invalid ring buffer layout is a fatal internal error"
)]
let must_get_data = |offset, len| {
data.get(offset..offset + len).unwrap_or_else(|| {
panic!("{:?} not in {:?}", offset..offset + len, 0..data.len())
})
};
let header_ptr: *const AtomicU32 = must_get_data(offset, size_of::<AtomicU32>())
.as_ptr()
.cast();
// Pair the kernel's SeqCst write (implies Release) [1] with an Acquire load. This
// ensures data written by the producer will be visible.
//
// [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L488
let header = unsafe { &*header_ptr }.load(Ordering::Acquire);
if header & BPF_RINGBUF_BUSY_BIT != 0 {
Item::Busy
impl ConsumerPos {
fn read_item<'data>(&self, data: &'data [u8], mask: u32) -> Item<'data> {
let Self { pos, metadata: _ } = self;
let offset = pos & usize::try_from(mask).unwrap();
#[expect(
clippy::panic,
reason = "invalid ring buffer layout is a fatal internal error"
)]
let must_get_data = |offset, len| {
data.get(offset..offset + len)
.unwrap_or_else(|| panic!("{:?} not in {:?}", offset..offset + len, 0..data.len()))
};
let header_ptr: *const AtomicU32 = must_get_data(offset, size_of::<AtomicU32>())
.as_ptr()
.cast();
// Pair the kernel's SeqCst write (implies Release) [1] with an Acquire load. This
// ensures data written by the producer will be visible.
//
// [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L488
let header = unsafe { &*header_ptr }.load(Ordering::Acquire);
if header & BPF_RINGBUF_BUSY_BIT != 0 {
Item::Busy
} else {
let len = usize::try_from(header & mask).unwrap();
if header & BPF_RINGBUF_DISCARD_BIT != 0 {
Item::Discard { len }
} else {
let len = usize::try_from(header & mask).unwrap();
if header & BPF_RINGBUF_DISCARD_BIT != 0 {
Item::Discard { len }
} else {
let data_offset = offset + usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap();
let data = must_get_data(data_offset, len);
Item::Data(data)
}
let data_offset = offset + usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap();
let data = must_get_data(data_offset, len);
Item::Data(data)
}
}
}
Expand Down
Loading
Loading