From d1f0ef5e4d39e6f3e49c26243219544d1fdacdfc Mon Sep 17 00:00:00 2001 From: Ori Shussman Date: Thu, 19 Mar 2026 16:03:24 +0000 Subject: [PATCH] ringbuf: add zero-copy read with deferred consumer advancement Signed-off-by: Ori Shussman --- ringbuf/reader.go | 42 ++++++++++- ringbuf/reader_test.go | 159 ++++++++++++++++++++++++++++++++++++++++ ringbuf/ring.go | 77 ++++++++++++++++--- ringbuf/ring_other.go | 8 ++ ringbuf/ring_windows.go | 8 ++ 5 files changed, 281 insertions(+), 13 deletions(-) diff --git a/ringbuf/reader.go b/ringbuf/reader.go index d1483d78a..4bc31989a 100644 --- a/ringbuf/reader.go +++ b/ringbuf/reader.go @@ -31,6 +31,8 @@ type eventRing interface { size() int AvailableBytes() uint64 readRecord(rec *Record) error + readRecordUnsafe(rec *Record) error + advance() Close() error } @@ -59,6 +61,10 @@ type Record struct { // The minimum number of bytes remaining in the ring buffer after this Record has been read. Remaining int + + // Set by readRecordZeroCopy to prevent readRecord from reusing + // RawSample as a write destination (it points into read-only mmap memory). + isReadOnly bool } // Reader allows reading bpf_ringbuf_output @@ -159,6 +165,40 @@ func (r *Reader) Read() (Record, error) { // ReadInto is like Read except that it allows reusing Record and associated buffers. func (r *Reader) ReadInto(rec *Record) error { + return r.readWait(func() error { + return r.ring.readRecord(rec) + }) +} + +// Like [Reader.ReadInto], but returns a zero-copy slice into the ring buffer's +// memory-mapped region instead of copying. The slice is valid until [Reader.Commit]. +// +// RawSample points into read-only memory and must not be written to. If the +// Record is later passed to [Reader.ReadInto], the existing buffer will not be +// reused for the same reason; a new allocation will occur instead. +// +// Does not advance the consumer position. Call [Reader.Commit] after processing +// to release space. Must not be mixed with [Reader.Read] or [Reader.ReadInto] +// between Commit calls, or an ErrNotCommitted error will be returned. +func (r *Reader) ReadUnsafe(rec *Record) error { + return r.readWait(func() error { + return r.ring.readRecordUnsafe(rec) + }) +} + +// Advances the consumer position, releasing ring buffer space from preceding +// [Reader.ReadUnsafe] calls. Slices from ReadUnsafe are invalid after this. +// No-op if there are no pending reads. +func (r *Reader) Commit() { + r.mu.Lock() + defer r.mu.Unlock() + + if r.ring != nil { + r.ring.advance() + } +} + +func (r *Reader) readWait(read func() error) error { r.mu.Lock() defer r.mu.Unlock() @@ -186,7 +226,7 @@ func (r *Reader) ReadInto(rec *Record) error { } for { - err := r.ring.readRecord(rec) + err := read() // Not using errors.Is which is quite a bit slower // For a tight loop it might make a difference if err == errBusy { diff --git a/ringbuf/reader_test.go b/ringbuf/reader_test.go index 89de15df0..f8c5540bc 100644 --- a/ringbuf/reader_test.go +++ b/ringbuf/reader_test.go @@ -322,6 +322,165 @@ func TestReadAfterClose(t *testing.T) { } } +func TestReadZeroCopySingle(t *testing.T) { + testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer") + + prog, events := mustOutputSamplesProg(t, sampleMessage{size: 5}) + + rd, err := NewReader(events) + qt.Assert(t, qt.IsNil(err)) + defer rd.Close() + + mustRun(t, prog) + + var rec Record + err = rd.ReadUnsafe(&rec) + qt.Assert(t, qt.IsNil(err)) + qt.Assert(t, qt.Equals(len(rec.RawSample), 5)) + qt.Assert(t, qt.DeepEquals(rec.RawSample, []byte{1, 2, 3, 4, 4})) + + rd.Commit() + qt.Assert(t, qt.Equals(rd.AvailableBytes(), 0)) +} + +func TestReadZeroCopyMulti(t *testing.T) { + testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer") + + prog, events := mustOutputSamplesProg(t, + sampleMessage{size: 5}, + sampleMessage{size: 10, discard: true}, + sampleMessage{size: 15}, + ) + + rd, err := NewReader(events) + qt.Assert(t, qt.IsNil(err)) + defer rd.Close() + + mustRun(t, prog) + + var rec Record + + err = rd.ReadUnsafe(&rec) + qt.Assert(t, qt.IsNil(err)) + qt.Assert(t, qt.Equals(len(rec.RawSample), 5)) + + err = rd.ReadUnsafe(&rec) + qt.Assert(t, qt.IsNil(err)) + qt.Assert(t, qt.Equals(len(rec.RawSample), 15)) + + rd.Commit() + qt.Assert(t, qt.Equals(rd.AvailableBytes(), 0)) +} + +func TestReadZeroCopyCommitReleasesSpace(t *testing.T) { + testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer") + + prog, events := mustOutputSamplesProg(t, sampleMessage{size: 5}) + + rd, err := NewReader(events) + qt.Assert(t, qt.IsNil(err)) + defer rd.Close() + + mustRun(t, prog) + + var rec Record + err = rd.ReadUnsafe(&rec) + qt.Assert(t, qt.IsNil(err)) + + qt.Assert(t, qt.Not(qt.Equals(rd.AvailableBytes(), 0))) + + rd.Commit() + qt.Assert(t, qt.Equals(rd.AvailableBytes(), 0)) + + mustRun(t, prog) + err = rd.ReadUnsafe(&rec) + qt.Assert(t, qt.IsNil(err)) + qt.Assert(t, qt.Equals(len(rec.RawSample), 5)) + rd.Commit() +} + +func TestReadZeroCopyDeadline(t *testing.T) { + testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer") + + _, events := mustOutputSamplesProg(t, sampleMessage{size: 5}) + + rd, err := NewReader(events) + qt.Assert(t, qt.IsNil(err)) + defer rd.Close() + + rd.SetDeadline(time.Now().Add(-time.Second)) + + var rec Record + err = rd.ReadUnsafe(&rec) + qt.Assert(t, qt.ErrorIs(err, os.ErrDeadlineExceeded)) +} + +func TestReadIntoRejectsUncommittedZeroCopy(t *testing.T) { + testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer") + + prog, events := mustOutputSamplesProg(t, sampleMessage{size: 5}) + + rd, err := NewReader(events) + qt.Assert(t, qt.IsNil(err)) + defer rd.Close() + + mustRun(t, prog) + + var rec Record + err = rd.ReadUnsafe(&rec) + qt.Assert(t, qt.IsNil(err)) + + err = rd.ReadInto(&rec) + qt.Assert(t, qt.ErrorIs(err, ErrNotCommitted)) + + _, err = rd.Read() + qt.Assert(t, qt.ErrorIs(err, ErrNotCommitted)) + + rd.Commit() + + mustRun(t, prog) + err = rd.ReadInto(&rec) + qt.Assert(t, qt.IsNil(err)) +} + +func TestCommitNoOp(t *testing.T) { + testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer") + + _, events := mustOutputSamplesProg(t, sampleMessage{size: 5}) + + rd, err := NewReader(events) + qt.Assert(t, qt.IsNil(err)) + defer rd.Close() + + rd.Commit() +} + +func BenchmarkReadZeroCopy(b *testing.B) { + testutils.SkipOnOldKernel(b, "5.8", "BPF ring buffer") + + prog, events := mustOutputSamplesProg(b, sampleMessage{size: 80, flags: 0}) + + rd, err := NewReader(events) + if err != nil { + b.Fatal(err) + } + defer rd.Close() + + b.ReportAllocs() + + var rec Record + for b.Loop() { + b.StopTimer() + mustRun(b, prog) + b.StartTimer() + + if err := rd.ReadUnsafe(&rec); err != nil { + b.Fatal("Can't read samples:", err) + } + rd.Commit() + } +} + func BenchmarkReader(b *testing.B) { testutils.SkipOnOldKernel(b, "5.8", "BPF ring buffer") diff --git a/ringbuf/ring.go b/ringbuf/ring.go index abb704cd5..00a255ad6 100644 --- a/ringbuf/ring.go +++ b/ringbuf/ring.go @@ -1,6 +1,7 @@ package ringbuf import ( + "errors" "fmt" "io" "sync/atomic" @@ -10,11 +11,18 @@ import ( "github.com/cilium/ebpf/internal/sys" ) +var ErrNotCommitted = errors.New("zero-copy records not yet committed") + type ringReader struct { // These point into mmap'ed memory and must be accessed atomically. prod_pos, cons_pos *uintptr mask uintptr ring []byte + + // Logical consumer position for deferred zero-copy reads. + // Only valid when hasPending is true. + pendingCons uintptr + hasPending bool } func newRingReader(cons_ptr, prod_ptr *uintptr, ring []byte) *ringReader { @@ -41,10 +49,49 @@ func (rr *ringReader) AvailableBytes() uint64 { return uint64(prod - cons) } -// Read a record from an event ring. +// Like readRecordZeroCopy, but copies data into rec.RawSample and advances +// the consumer position immediately. func (rr *ringReader) readRecord(rec *Record) error { + if rr.hasPending { + return ErrNotCommitted + } + + buf := rec.RawSample + if rec.isReadOnly { + buf = nil + } + + defer func() { + rec.isReadOnly = false + rr.advance() + }() + + err := rr.readRecordUnsafe(rec) + if err != nil { + return err + } + + n := len(rec.RawSample) + if cap(buf) < n { + buf = make([]byte, n) + } else { + buf = buf[:n] + } + copy(buf, rec.RawSample) + rec.RawSample = buf + + return nil +} + +// Sets rec.RawSample to a slice of the mmap'd ring buffer memory. +// Does not advance the consumer position; call advance separately. +func (rr *ringReader) readRecordUnsafe(rec *Record) error { prod := atomic.LoadUintptr(rr.prod_pos) - cons := atomic.LoadUintptr(rr.cons_pos) + + cons := rr.pendingCons + if !rr.hasPending { + cons = atomic.LoadUintptr(rr.cons_pos) + } for { if remaining := prod - cons; remaining == 0 { @@ -81,21 +128,27 @@ func (rr *ringReader) readRecord(rec *Record) error { if header.isDiscard() { // when the record header indicates that the data should be - // discarded, we skip it by just updating the consumer position + // discarded, we skip it by just updating the pending position // to the next record. - atomic.StoreUintptr(rr.cons_pos, cons) + rr.pendingCons = cons + rr.hasPending = true continue } - if n := header.dataLen(); cap(rec.RawSample) < n { - rec.RawSample = make([]byte, n) - } else { - rec.RawSample = rec.RawSample[:n] - } - - copy(rec.RawSample, rr.ring[start:]) + n := header.dataLen() + rec.RawSample = rr.ring[start : start+uintptr(n)] rec.Remaining = int(prod - cons) - atomic.StoreUintptr(rr.cons_pos, cons) + rec.isReadOnly = true + rr.pendingCons = cons + rr.hasPending = true return nil } } + +// Commits the pending consumer position from readRecordZeroCopy calls. +func (rr *ringReader) advance() { + if rr.hasPending { + atomic.StoreUintptr(rr.cons_pos, rr.pendingCons) + rr.hasPending = false + } +} diff --git a/ringbuf/ring_other.go b/ringbuf/ring_other.go index 339ef736a..3acccfb6e 100644 --- a/ringbuf/ring_other.go +++ b/ringbuf/ring_other.go @@ -49,6 +49,14 @@ func newRingBufEventRing(mapFD, size int) (*mmapEventRing, error) { return ring, nil } +func (ring *mmapEventRing) readRecordUnsafe(rec *Record) error { + return ring.ringReader.readRecordUnsafe(rec) +} + +func (ring *mmapEventRing) advance() { + ring.ringReader.advance() +} + func (ring *mmapEventRing) Close() error { ring.cleanup.Stop() diff --git a/ringbuf/ring_windows.go b/ringbuf/ring_windows.go index b89be3313..c204ec497 100644 --- a/ringbuf/ring_windows.go +++ b/ringbuf/ring_windows.go @@ -62,6 +62,14 @@ func newRingBufEventRing(mapFD, size int) (*windowsEventRing, error) { return ring, nil } +func (ring *windowsEventRing) readRecordUnsafe(rec *Record) error { + return ring.ringReader.readRecordUnsafe(rec) +} + +func (ring *windowsEventRing) advance() { + ring.ringReader.advance() +} + func (ring *windowsEventRing) Close() error { ring.cleanup.Stop()