diff --git a/btf/testdata/btf_testmod.btf b/btf/testdata/btf_testmod.btf index a774860a8..df2348370 100644 Binary files a/btf/testdata/btf_testmod.btf and b/btf/testdata/btf_testmod.btf differ diff --git a/btf/testdata/vmlinux.btf.gz b/btf/testdata/vmlinux.btf.gz index c35714299..f2789fdd7 100644 Binary files a/btf/testdata/vmlinux.btf.gz and b/btf/testdata/vmlinux.btf.gz differ diff --git a/ringbuf/reader.go b/ringbuf/reader.go index 0864f2ae1..24d38e63d 100644 --- a/ringbuf/reader.go +++ b/ringbuf/reader.go @@ -5,9 +5,12 @@ import ( "fmt" "os" "sync" + "sync/atomic" "time" "unsafe" + "iter" + "github.com/cilium/ebpf" "github.com/cilium/ebpf/internal/platform" "github.com/cilium/ebpf/internal/sys" @@ -40,6 +43,11 @@ func (rh *ringbufHeader) dataLen() int { } type Record struct { + // RawSample contains the raw bytes of a ringbuf record. + // + // When obtained via [Reader.Records], RawSample is a zero-copy view into the + // underlying mmap. It is only valid until the iterator yields the next + // record or terminates. Callers must copy the data if they need to retain it. RawSample []byte // The minimum number of bytes remaining in the ring buffer after this Record has been read. @@ -144,6 +152,59 @@ func (r *Reader) Read() (Record, error) { // ReadInto is like Read except that it allows reusing Record and associated buffers. func (r *Reader) ReadInto(rec *Record) error { + return r.readLocked(func() error { + return r.ring.readRecord(rec) + }) +} + +// Records iterates over records in the reader until [Reader.Close] is called. +// +// Record.RawSample is only valid until the next call to the iterator. Callers +// must copy the data if it needs to outlive the current iteration. +// +// This convenience wrapper allocates a single Record once. To fully avoid +// allocations, use [Reader.RecordsInto] and pass in a reusable Record. +func (r *Reader) Records() iter.Seq2[*Record, error] { + rec := Record{} + return r.RecordsInto(&rec) +} + +// RecordsInto is like Records but allows reusing the Record and associated buffers. +func (r *Reader) RecordsInto(rec *Record) iter.Seq2[*Record, error] { + return func(yield func(*Record, error) bool) { + var ( + sample []byte + remaining int + nextCons uintptr + ) + + for { + err := r.readLocked(func() error { + var err error + sample, remaining, nextCons, err = r.ring.readSample() + return err + }) + if err != nil { + yield(nil, err) + return + } + + // Limit cap to len so append can't write past the record and corrupt the ring. + rec.RawSample = sample[:len(sample):len(sample)] + rec.Remaining = remaining + + if !yield(rec, nil) { + atomic.StoreUintptr(r.ring.cons_pos, nextCons) + return + } + + atomic.StoreUintptr(r.ring.cons_pos, nextCons) + } + } +} + +// readLocked drives the polling / data-availability loop shared by Record reads. +func (r *Reader) readLocked(read func() error) error { r.mu.Lock() defer r.mu.Unlock() @@ -171,9 +232,7 @@ func (r *Reader) ReadInto(rec *Record) error { } for { - err := r.ring.readRecord(rec) - // Not using errors.Is which is quite a bit slower - // For a tight loop it might make a difference + err := read() if err == errBusy { continue } diff --git a/ringbuf/reader_test.go b/ringbuf/reader_test.go index 89de15df0..88dbca69c 100644 --- a/ringbuf/reader_test.go +++ b/ringbuf/reader_test.go @@ -384,3 +384,34 @@ func BenchmarkReadInto(b *testing.B) { } } } + +func TestRecordsIterator(t *testing.T) { + testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer") + + prog, events := mustOutputSamplesProg(t, + sampleMessage{size: 5, flags: 0}, + sampleMessage{size: 7, flags: 0}, + ) + mustRun(t, prog) + + rd, err := NewReader(events) + if err != nil { + t.Fatal(err) + } + defer rd.Close() + + var seen [][]byte + for rec, err := range rd.Records() { + if err != nil { + t.Fatalf("iteration error: %v", err) + } + seen = append(seen, append([]byte(nil), rec.RawSample...)) + if len(seen) == 2 { + break + } + } + + qt.Assert(t, qt.Equals(len(seen), 2)) + qt.Assert(t, qt.DeepEquals(seen[0], []byte{1, 2, 3, 4, 4})) + qt.Assert(t, qt.DeepEquals(seen[1], []byte{1, 2, 3, 4, 4, 3, 2})) +} diff --git a/ringbuf/ring.go b/ringbuf/ring.go index abb704cd5..66b87aaa0 100644 --- a/ringbuf/ring.go +++ b/ringbuf/ring.go @@ -41,16 +41,17 @@ func (rr *ringReader) AvailableBytes() uint64 { return uint64(prod - cons) } -// Read a record from an event ring. -func (rr *ringReader) readRecord(rec *Record) error { +// readSample returns a zero-copy view into the next sample, together with the +// consumer position that should be stored to release the data. +func (rr *ringReader) readSample() (sample []byte, remaining int, nextCons uintptr, err error) { prod := atomic.LoadUintptr(rr.prod_pos) cons := atomic.LoadUintptr(rr.cons_pos) for { if remaining := prod - cons; remaining == 0 { - return errEOR + return nil, 0, 0, errEOR } else if remaining < sys.BPF_RINGBUF_HDR_SZ { - return fmt.Errorf("read record header: %w", io.ErrUnexpectedEOF) + return nil, 0, 0, fmt.Errorf("read record header: %w", io.ErrUnexpectedEOF) } // read the len field of the header atomically to ensure a happens before @@ -65,7 +66,7 @@ func (rr *ringReader) readRecord(rec *Record) error { // the next sample in the ring is not committed yet so we // exit without storing the reader/consumer position // and start again from the same position. - return errBusy + return nil, 0, 0, errBusy } cons += sys.BPF_RINGBUF_HDR_SZ @@ -73,7 +74,7 @@ func (rr *ringReader) readRecord(rec *Record) error { // Data is always padded to 8 byte alignment. dataLenAligned := uintptr(internal.Align(header.dataLen(), 8)) if remaining := prod - cons; remaining < dataLenAligned { - return fmt.Errorf("read sample data: %w", io.ErrUnexpectedEOF) + return nil, 0, 0, fmt.Errorf("read sample data: %w", io.ErrUnexpectedEOF) } start = cons & rr.mask @@ -87,15 +88,26 @@ func (rr *ringReader) readRecord(rec *Record) error { continue } - if n := header.dataLen(); cap(rec.RawSample) < n { - rec.RawSample = make([]byte, n) - } else { - rec.RawSample = rec.RawSample[:n] - } + end := int(start) + header.dataLen() + return rr.ring[start:end], int(prod - cons), cons, nil + } +} - copy(rec.RawSample, rr.ring[start:]) - rec.Remaining = int(prod - cons) - atomic.StoreUintptr(rr.cons_pos, cons) - return nil +// Read a record from an event ring, copying the sample into the provided Record. +func (rr *ringReader) readRecord(rec *Record) error { + sample, remaining, nextCons, err := rr.readSample() + if err != nil { + return err + } + + if n := len(sample); cap(rec.RawSample) < n { + rec.RawSample = make([]byte, n) + } else { + rec.RawSample = rec.RawSample[:n] } + + copy(rec.RawSample, sample) + rec.Remaining = remaining + atomic.StoreUintptr(rr.cons_pos, nextCons) + return nil }