diff --git a/aya-log/src/lib.rs b/aya-log/src/lib.rs index 3b6040c7e..b067dcb2f 100644 --- a/aya-log/src/lib.rs +++ b/aya-log/src/lib.rs @@ -182,9 +182,7 @@ impl EbpfLogger { /// Reads log records from eBPF and writes them to the logger. pub fn flush(&mut self) { let Self { ring_buf, logger } = self; - while let Some(buf) = ring_buf.next() { - log_buf(buf.as_ref(), logger).unwrap(); - } + ring_buf.for_each(|buf| log_buf(buf, logger).unwrap()); } } diff --git a/aya/Cargo.toml b/aya/Cargo.toml index 5d84f154b..e14995752 100644 --- a/aya/Cargo.toml +++ b/aya/Cargo.toml @@ -26,6 +26,7 @@ libc = { workspace = true } log = { workspace = true } object = { workspace = true, features = ["elf", "read_core", "std", "write"] } once_cell = { workspace = true } +scopeguard = { workspace = true } thiserror = { workspace = true } [dev-dependencies] diff --git a/aya/src/maps/ring_buf.rs b/aya/src/maps/ring_buf.rs index 94c8eda8c..512247b47 100644 --- a/aya/src/maps/ring_buf.rs +++ b/aya/src/maps/ring_buf.rs @@ -6,8 +6,9 @@ use std::{ borrow::Borrow, + convert::Infallible, fmt::{self, Debug, Formatter}, - ops::Deref, + ops::{ControlFlow, Deref}, os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd}, sync::atomic::{AtomicU32, AtomicUsize, Ordering}, }; @@ -37,7 +38,9 @@ use crate::{ /// /// To receive events you need to: /// * Construct [`RingBuf`] using [`RingBuf::try_from`]. -/// * Call [`RingBuf::next`] to poll events from the [`RingBuf`]. +/// * Call [`RingBuf::next`] to read one entry at a time, or +/// [`RingBuf::try_fold`], [`RingBuf::fold`], or [`RingBuf::for_each`] to +/// process available entries in bulk. /// /// To receive async notifications of data availability, you may construct an /// [`tokio::io::unix::AsyncFd`] from the [`RingBuf`]'s file descriptor and poll it for readiness. @@ -71,9 +74,9 @@ use crate::{ /// loop { /// let mut guard = poll.readable(); /// let ring_buf = guard.inner_mut(); -/// while let Some(item) = ring_buf.next() { +/// ring_buf.for_each(|item| { /// println!("received: {:?}", item); -/// } +/// }); /// guard.clear_ready(); /// } /// # Ok::<(), aya::EbpfError>(()) @@ -128,10 +131,60 @@ impl RingBuf { )] pub fn next(&mut self) -> Option> { let Self { - consumer, producer, .. + map: _, + consumer, + producer, } = self; producer.next(consumer) } + + /// Processes entries in the ring buffer with `f`. + /// + /// For each available data entry, `f` receives the accumulator and + /// entry: + /// * [`ControlFlow::Continue(next)`](ControlFlow::Continue) keeps draining with `next`. + /// * [`ControlFlow::Break(break_value)`](ControlFlow::Break) stops early and returns `break_value`. + /// + /// If the ring buffer is fully drained, returns [`ControlFlow::Continue`] + /// containing the final accumulator. + pub fn try_fold(&mut self, init: C, f: F) -> ControlFlow + where + F: FnMut(C, &[u8]) -> ControlFlow, + { + let Self { + map: _, + consumer, + producer, + } = self; + producer.try_fold(consumer, init, f) + } + + /// Processes entries in the ring buffer with `f`. + /// + /// For each available data entry, `f` receives the accumulator and entry, + /// and returns the next accumulator. + /// + /// Unlike [`RingBuf::try_fold`], this function cannot short-circuit: it + /// always processes entries until the ring buffer is fully drained, then + /// returns the final accumulator. + pub fn fold(&mut self, init: C, mut f: F) -> C + where + F: FnMut(C, &[u8]) -> C, + { + let ControlFlow::Continue(acc) = self + .try_fold::(init, |acc, data| ControlFlow::Continue(f(acc, data))); + acc + } + + /// Processes entries in the ring buffer with `f`. + /// + /// For each available data entry, `f` receives the entry. + pub fn for_each(&mut self, mut f: F) + where + F: FnMut(&[u8]), + { + self.fold((), |(), data| f(data)) + } } impl> AsFd for RingBuf { @@ -151,7 +204,7 @@ impl> AsRawFd for RingBuf { } } -/// The current outstanding item read from the ringbuf. +/// An item read from the ring buffer. pub struct RingBufItem<'a> { data: &'a [u8], consumer: &'a mut ConsumerPos, @@ -161,15 +214,16 @@ impl Deref for RingBufItem<'_> { type Target = [u8]; fn deref(&self) -> &Self::Target { - let Self { data, .. } = self; + let Self { data, consumer: _ } = self; data } } impl Drop for RingBufItem<'_> { fn drop(&mut self) { - let Self { consumer, data } = self; - consumer.consume(data.len()) + let Self { data, consumer } = self; + consumer.consume(data.len()); + consumer.commit(); } } @@ -228,10 +282,13 @@ impl ConsumerPos { } fn consume(&mut self, len: usize) { - let Self { pos, metadata } = self; + let Self { pos, metadata: _ } = self; *pos += (usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap() + len).next_multiple_of(8); + } + fn commit(&self) { + let Self { pos, metadata } = self; // Write operation needs to be properly ordered with respect to the producer committing new // data to the ringbuf. The producer uses xchg (SeqCst) to commit new data [1]. The producer // reads the consumer offset after clearing the busy bit on a new entry [2]. By using SeqCst @@ -321,89 +378,171 @@ impl ProducerData { pos_cache, mask, } = self; - let mmap = &*mmap; + let data_pages = Self::data_pages(mmap, *data_offset); + loop { + if !Self::data_available(mmap, pos_cache, consumer, |_consumer| {}) { + return None; + } + match consumer.read_item(data_pages, *mask) { + Item::Busy => return None, + Item::Discard { len } => { + consumer.consume(len); + consumer.commit(); + } + Item::Data(data) => { + return Some(RingBufItem { data, consumer }); + } + } + } + } + + fn data_pages(mmap: &MMap, data_offset: usize) -> &[u8] { let mmap_data = mmap.as_ref(); #[expect( clippy::panic, reason = "invalid ring buffer layout is a fatal internal error" )] - let data_pages = mmap_data.get(*data_offset..).unwrap_or_else(|| { + mmap_data.get(data_offset..).unwrap_or_else(|| { panic!( "offset {} out of bounds, data len {}", data_offset, mmap_data.len() ) - }); - while data_available(mmap, pos_cache, consumer) { - match read_item(data_pages, *mask, consumer) { - Item::Busy => return None, - Item::Discard { len } => consumer.consume(len), - Item::Data(data) => return Some(RingBufItem { data, consumer }), - } - } - return None; + }) + } - enum Item<'a> { - Busy, - Discard { len: usize }, - Data(&'a [u8]), + fn data_available( + producer: &MMap, + producer_cache: &mut usize, + consumer: &mut ConsumerPos, + mut flush_consumer: F, + ) -> bool + where + F: FnMut(&mut ConsumerPos), + { + let ConsumerPos { pos, metadata: _ } = consumer; + // Refresh the producer position cache if it appears that the consumer is caught up + // with the producer position. + if pos == producer_cache { + // Persist the consumer position to avoid starving the producer. + flush_consumer(consumer); + *producer_cache = load_producer_pos(producer); } - fn data_available( - producer: &MMap, - producer_cache: &mut usize, - consumer: &ConsumerPos, - ) -> bool { - let ConsumerPos { pos: consumer, .. } = consumer; - // Refresh the producer position cache if it appears that the consumer is caught up - // with the producer position. - if consumer == producer_cache { - *producer_cache = load_producer_pos(producer); - } + // Note that we don't compare the order of the values because the producer position may + // overflow u32 and wrap around to 0. Instead we just compare equality and assume that + // the consumer position is always logically less than the producer position. + // + // Note also that the kernel, at the time of writing [1], doesn't seem to handle this + // overflow correctly at all, and it's not clear that one can produce events after the + // producer position has wrapped around. + // + // [1]: https://github.com/torvalds/linux/blob/4b810bf0/kernel/bpf/ringbuf.c#L434-L440 + let ConsumerPos { pos, metadata: _ } = consumer; + pos != producer_cache + } - // Note that we don't compare the order of the values because the producer position may - // overflow u32 and wrap around to 0. Instead we just compare equality and assume that - // the consumer position is always logically less than the producer position. - // - // Note also that the kernel, at the time of writing [1], doesn't seem to handle this - // overflow correctly at all, and it's not clear that one can produce events after the - // producer position has wrapped around. - // - // [1]: https://github.com/torvalds/linux/blob/4b810bf0/kernel/bpf/ringbuf.c#L434-L440 - consumer != producer_cache + fn try_fold( + &mut self, + consumer: &mut ConsumerPos, + init: C, + mut f: F, + ) -> ControlFlow + where + F: FnMut(C, &[u8]) -> ControlFlow, + { + let Self { + mmap, + data_offset, + pos_cache, + mask, + } = self; + let data_pages = Self::data_pages(mmap, *data_offset); + let mut acc = init; + let mut advanced = false; + let consume = |consumer: &mut ConsumerPos, advanced: &mut bool, len: usize| { + consumer.consume(len); + *advanced = true; + }; + let flush = |consumer: &mut ConsumerPos, advanced: &mut bool| -> bool { + let flush = std::mem::replace(advanced, false); + if flush { + consumer.commit(); + } + flush + }; + // This must be deferred in case `f` panics. + let mut guard = scopeguard::guard((consumer, &mut advanced), |(consumer, advanced)| { + flush(consumer, advanced); + }); + loop { + let (consumer, advanced) = &mut *guard; + if !Self::data_available(mmap, pos_cache, consumer, |consumer| { + flush(consumer, advanced); + }) { + break; + } + match consumer.read_item(data_pages, *mask) { + Item::Busy => { + if !flush(consumer, advanced) { + break; + } + } + Item::Discard { len } => consume(consumer, advanced, len), + Item::Data(data) => { + // This must be deferred in case `f` panics. + scopeguard::defer! { consume(consumer, advanced, data.len()) }; + match f(acc, data) { + ControlFlow::Continue(next) => { + acc = next; + } + ControlFlow::Break(v) => { + return ControlFlow::Break(v); + } + } + } + } } + ControlFlow::Continue(acc) + } +} + +enum Item<'a> { + Busy, + Discard { len: usize }, + Data(&'a [u8]), +} - fn read_item<'data>(data: &'data [u8], mask: u32, pos: &ConsumerPos) -> Item<'data> { - let ConsumerPos { pos, .. } = pos; - let offset = pos & usize::try_from(mask).unwrap(); - #[expect( - clippy::panic, - reason = "invalid ring buffer layout is a fatal internal error" - )] - let must_get_data = |offset, len| { - data.get(offset..offset + len).unwrap_or_else(|| { - panic!("{:?} not in {:?}", offset..offset + len, 0..data.len()) - }) - }; - let header_ptr: *const AtomicU32 = must_get_data(offset, size_of::()) - .as_ptr() - .cast(); - // Pair the kernel's SeqCst write (implies Release) [1] with an Acquire load. This - // ensures data written by the producer will be visible. - // - // [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L488 - let header = unsafe { &*header_ptr }.load(Ordering::Acquire); - if header & BPF_RINGBUF_BUSY_BIT != 0 { - Item::Busy +impl ConsumerPos { + fn read_item<'data>(&self, data: &'data [u8], mask: u32) -> Item<'data> { + let Self { pos, metadata: _ } = self; + let offset = pos & usize::try_from(mask).unwrap(); + #[expect( + clippy::panic, + reason = "invalid ring buffer layout is a fatal internal error" + )] + let must_get_data = |offset, len| { + data.get(offset..offset + len) + .unwrap_or_else(|| panic!("{:?} not in {:?}", offset..offset + len, 0..data.len())) + }; + let header_ptr: *const AtomicU32 = must_get_data(offset, size_of::()) + .as_ptr() + .cast(); + // Pair the kernel's SeqCst write (implies Release) [1] with an Acquire load. This + // ensures data written by the producer will be visible. + // + // [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L488 + let header = unsafe { &*header_ptr }.load(Ordering::Acquire); + if header & BPF_RINGBUF_BUSY_BIT != 0 { + Item::Busy + } else { + let len = usize::try_from(header & mask).unwrap(); + if header & BPF_RINGBUF_DISCARD_BIT != 0 { + Item::Discard { len } } else { - let len = usize::try_from(header & mask).unwrap(); - if header & BPF_RINGBUF_DISCARD_BIT != 0 { - Item::Discard { len } - } else { - let data_offset = offset + usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap(); - let data = must_get_data(data_offset, len); - Item::Data(data) - } + let data_offset = offset + usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap(); + let data = must_get_data(data_offset, len); + Item::Data(data) } } } diff --git a/test/integration-test/src/tests/load.rs b/test/integration-test/src/tests/load.rs index d42eee9b7..cc6e9b7ad 100644 --- a/test/integration-test/src/tests/load.rs +++ b/test/integration-test/src/tests/load.rs @@ -67,10 +67,13 @@ fn ringbuffer_btf_map() { trigger_bpf_program(); - let item = ring_buf.next().unwrap(); - let item: [u8; 4] = (*item).try_into().unwrap(); - let val = u32::from_ne_bytes(item); - assert_eq!(val, 0xdeadbeef); + let mut items = Vec::new(); + ring_buf.for_each(|item| { + let item = item.try_into().unwrap(); + let val = u32::from_ne_bytes(item); + items.push(val); + }); + assert_eq!(items, &[0xdeadbeef]); } #[test_log::test] diff --git a/test/integration-test/src/tests/ring_buf.rs b/test/integration-test/src/tests/ring_buf.rs index 613dd0464..cba5ff41d 100644 --- a/test/integration-test/src/tests/ring_buf.rs +++ b/test/integration-test/src/tests/ring_buf.rs @@ -1,4 +1,5 @@ use std::{ + ops::ControlFlow, os::fd::AsRawFd as _, path::Path, sync::{ @@ -9,7 +10,6 @@ use std::{ time::Duration, }; -use anyhow::Context as _; use assert_matches::assert_matches; use aya::{ Ebpf, EbpfLoader, @@ -151,21 +151,13 @@ fn ring_buf(n: usize) { } } - let mut seen = Vec::::new(); - while seen.len() < expected.len() { - if let Some(read) = ring_buf.next() { - let read: [u8; 8] = (*read) - .try_into() - .with_context(|| format!("data: {:?}", read.len())) - .unwrap(); - let arg = u64::from_ne_bytes(read); - assert_eq!(arg % 2, 0, "got {arg} from probe"); - seen.push(arg); - } - } - - // Make sure that there is nothing else in the ring_buf. - assert_matches!(ring_buf.next(), None); + let mut seen = Vec::with_capacity(expected.len()); + ring_buf.for_each(|read| { + let read = read.try_into().unwrap(); + let arg = u64::from_ne_bytes(read); + assert_eq!(arg % 2, 0, "got {arg} from probe"); + seen.push(arg); + }); // Ensure that the data that was read matches what was passed, and the rejected count was set // properly. @@ -207,13 +199,13 @@ fn ring_buf_mismatch_size( .unwrap(); trigger(value.into()); - { - let read = ring_buf.next().unwrap(); + let mut items = Vec::new(); + ring_buf.for_each(|read| { assert_eq!(read.len(), size_of::()); - let decoded = decode(read.as_ref()); - assert_eq!(decoded, value); - } - assert_matches!(ring_buf.next(), None); + let decoded = decode(read); + items.push(decoded); + }); + assert_eq!(items, &[value]); } #[test_log::test] @@ -226,8 +218,8 @@ fn ring_buf_mismatch_small() { ring_buf_trigger_ebpf_program, value, |read| { - let bytes: [u8; 2] = read.try_into().unwrap(); - u16::from_ne_bytes(bytes) + let read = read.try_into().unwrap(); + u16::from_ne_bytes(read) }, ); } @@ -242,8 +234,8 @@ fn ring_buf_mismatch_large() { ring_buf_trigger_ebpf_program, value, |read| { - let bytes: [u8; 8] = read.try_into().unwrap(); - u64::from_ne_bytes(bytes) + let read = read.try_into().unwrap(); + u64::from_ne_bytes(read) }, ); } @@ -271,15 +263,12 @@ async fn ring_buf_async_with_drops() { // Construct an AsyncFd from the RingBuf in order to receive readiness notifications. let mut seen = 0; let mut process_ring_buf = |ring_buf: &mut RingBuf<_>| { - while let Some(read) = ring_buf.next() { - let read: [u8; 8] = (*read) - .try_into() - .with_context(|| format!("data: {:?}", read.len())) - .unwrap(); + ring_buf.for_each(|read| { + let read = read.try_into().unwrap(); let arg = u64::from_ne_bytes(read); assert_eq!(arg % 2, 0, "got {arg} from probe"); seen += 1; - } + }); }; let mut writer = futures::future::try_join_all(data.chunks(8).map(ToOwned::to_owned).map(|v| { @@ -320,9 +309,6 @@ async fn ring_buf_async_with_drops() { } } - // Make sure that there is nothing else in the ring_buf. - assert_matches!(async_fd.into_inner().next(), None); - let max_dropped: u64 = u64::try_from(data.len().saturating_sub(RING_BUF_MAX_ENTRIES - 1)).unwrap(); let max_seen = u64::try_from(data.iter().filter(|v| *v % 2 == 0).count()).unwrap(); @@ -393,24 +379,18 @@ async fn ring_buf_async_no_drop() { while seen.len() < expected_len { let mut guard = async_fd.readable_mut().await.unwrap(); let ring_buf = guard.get_inner_mut(); - while let Some(read) = ring_buf.next() { - let read: [u8; 8] = (*read) - .try_into() - .with_context(|| format!("data: {:?}", read.len())) - .unwrap(); + ring_buf.for_each(|read| { + let read = read.try_into().unwrap(); let arg = u64::from_ne_bytes(read); seen.push(arg); - } + }); guard.clear_ready(); } - (seen, async_fd.into_inner()) + seen }; - let (writer, (seen, mut ring_buf)) = futures::future::join(writer, reader).await; + let (writer, seen) = futures::future::join(writer, reader).await; writer.unwrap(); - // Make sure that there is nothing else in the ring_buf. - assert_matches!(ring_buf.next(), None); - // Ensure that the data that was read matches what was passed. assert_eq!(&seen, &expected); let Registers { dropped, rejected } = regs.get(&0, 0).unwrap(); @@ -448,10 +428,10 @@ fn ring_buf_epoll_wakeup() { let writer = WriterThread::spawn(); while total_events < WriterThread::NUM_MESSAGES { epoll::wait(epoll_fd, -1, &mut epoll_event_buf).unwrap(); - while let Some(read) = ring_buf.next() { + ring_buf.for_each(|read| { assert_eq!(read.len(), 8); total_events += 1; - } + }); } writer.join(); } @@ -473,11 +453,11 @@ async fn ring_buf_asyncfd_events() { let writer = WriterThread::spawn(); while total_events < WriterThread::NUM_MESSAGES { let mut guard = async_fd.readable_mut().await.unwrap(); - let rb = guard.get_inner_mut(); - while let Some(read) = rb.next() { + let ring_buf = guard.get_inner_mut(); + ring_buf.for_each(|read| { assert_eq!(read.len(), 8); total_events += 1; - } + }); guard.clear_ready(); } writer.join(); @@ -551,11 +531,22 @@ async fn ring_buf_pinned() { ring_buf_trigger_ebpf_program(v); } let (to_read_before_reopen, to_read_after_reopen) = to_write_before_reopen.split_at(2); - for v in to_read_before_reopen { - let item = ring_buf.next().unwrap(); - let item: [u8; 8] = item.as_ref().try_into().unwrap(); - assert_eq!(item, v.to_ne_bytes()); - } + + assert_matches!( + ring_buf.try_fold(to_read_before_reopen, |to_read, item| { + assert_matches!(to_read, [v, to_read @ ..] => { + let item: [u8; 8] = item.as_ref().try_into().unwrap(); + assert_eq!(item, v.to_ne_bytes()); + if to_read.is_empty() { + ControlFlow::Break(()) + } else { + ControlFlow::Continue(to_read) + } + }) + }), + ControlFlow::Break(()) + ); + drop(ring_buf); drop(bpf); @@ -579,15 +570,17 @@ async fn ring_buf_pinned() { } // Read both the data that was written before the ring buffer was reopened and the data that // was written after it was reopened. - for v in to_read_after_reopen - .iter() - .chain(to_write_after_reopen.iter()) - { - let item = ring_buf.next().unwrap(); - let item: [u8; 8] = item.as_ref().try_into().unwrap(); - assert_eq!(item, v.to_ne_bytes()); - } - // Make sure there is nothing else in the ring buffer. - assert_matches!(ring_buf.next(), None); + let mut iter = ring_buf.fold( + to_read_after_reopen + .iter() + .chain(to_write_after_reopen.iter()), + |mut iter, item| { + let v = iter.next().unwrap(); + let item = item.try_into().unwrap(); + assert_eq!(u64::from_ne_bytes(item), *v); + iter + }, + ); + assert_eq!(iter.next(), None); } } diff --git a/test/integration-test/src/tests/uprobe_cookie.rs b/test/integration-test/src/tests/uprobe_cookie.rs index 1a7f08d22..25a406a1a 100644 --- a/test/integration-test/src/tests/uprobe_cookie.rs +++ b/test/integration-test/src/tests/uprobe_cookie.rs @@ -59,15 +59,10 @@ fn test_uprobe_cookie() { const EXP: &[u64] = &[1, 2, 1, 3]; let mut seen = Vec::new(); - while let Some(read) = ring_buf.next() { - let read = read.as_ref(); - match read.try_into() { - Ok(read) => seen.push(u64::from_ne_bytes(read)), - Err(std::array::TryFromSliceError { .. }) => { - panic!("invalid ring buffer data: {read:x?}") - } - } - } + ring_buf.for_each(|read| { + let read = read.try_into().unwrap(); + seen.push(u64::from_ne_bytes(read)); + }); assert_eq!(seen, EXP); } diff --git a/xtask/public-api/aya.txt b/xtask/public-api/aya.txt index d6acf99a7..2df97b771 100644 --- a/xtask/public-api/aya.txt +++ b/xtask/public-api/aya.txt @@ -345,7 +345,10 @@ impl core::panic::unwind_safe::UnwindSafe for aya::maps::queue::Queue impl aya::maps::ring_buf::RingBuf +pub fn aya::maps::ring_buf::RingBuf::fold(&mut self, init: C, f: F) -> C where F: core::ops::function::FnMut(C, &[u8]) -> C +pub fn aya::maps::ring_buf::RingBuf::for_each(&mut self, f: F) where F: core::ops::function::FnMut(&[u8]) pub fn aya::maps::ring_buf::RingBuf::next(&mut self) -> core::option::Option> +pub fn aya::maps::ring_buf::RingBuf::try_fold(&mut self, init: C, f: F) -> core::ops::control_flow::ControlFlow where F: core::ops::function::FnMut(C, &[u8]) -> core::ops::control_flow::ControlFlow impl core::convert::TryFrom for aya::maps::ring_buf::RingBuf pub type aya::maps::ring_buf::RingBuf::Error = aya::maps::MapError pub fn aya::maps::ring_buf::RingBuf::try_from(map: aya::maps::Map) -> core::result::Result @@ -1494,7 +1497,10 @@ impl core::panic::unwind_safe::RefUnwindSafe for aya::maps::ReusePortSockArra impl core::panic::unwind_safe::UnwindSafe for aya::maps::ReusePortSockArray where T: core::panic::unwind_safe::UnwindSafe pub struct aya::maps::RingBuf impl aya::maps::ring_buf::RingBuf +pub fn aya::maps::ring_buf::RingBuf::fold(&mut self, init: C, f: F) -> C where F: core::ops::function::FnMut(C, &[u8]) -> C +pub fn aya::maps::ring_buf::RingBuf::for_each(&mut self, f: F) where F: core::ops::function::FnMut(&[u8]) pub fn aya::maps::ring_buf::RingBuf::next(&mut self) -> core::option::Option> +pub fn aya::maps::ring_buf::RingBuf::try_fold(&mut self, init: C, f: F) -> core::ops::control_flow::ControlFlow where F: core::ops::function::FnMut(C, &[u8]) -> core::ops::control_flow::ControlFlow impl core::convert::TryFrom for aya::maps::ring_buf::RingBuf pub type aya::maps::ring_buf::RingBuf::Error = aya::maps::MapError pub fn aya::maps::ring_buf::RingBuf::try_from(map: aya::maps::Map) -> core::result::Result