From 4f3ebb79931b402af69510a451eb1f464e5eaf98 Mon Sep 17 00:00:00 2001 From: graymon Date: Thu, 8 Jan 2026 00:00:38 +0800 Subject: [PATCH] ringbuf: add zero-copy reader API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce Reader.ReadZeroCopy to process samples directly from the ring buffer via a callback. Refactor the reader to share the same polling loop between ReadInto and ReadZeroCopy, and add ringReader.readRecordZeroCopy as the internal zero-copy path, with readRecord delegating to it. Benchmarks (80–2048B) show zero-copy is flat at small sizes and scales much better for larger records, reaching up to 5.63x speedup. Signed-off-by: graymon --- ringbuf/reader.go | 18 ++++++++++++++++-- ringbuf/ring.go | 27 ++++++++++++++++++--------- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/ringbuf/reader.go b/ringbuf/reader.go index 0864f2ae1..daa29fca1 100644 --- a/ringbuf/reader.go +++ b/ringbuf/reader.go @@ -135,7 +135,7 @@ func (r *Reader) SetDeadline(t time.Time) { // Returns [os.ErrDeadlineExceeded] if a deadline was set and after all records // have been read from the ring. // -// See [ReadInto] for a more efficient version of this method. +// See [ReadInto] or [ReadZeroCopy] for more efficient versions of this method. func (r *Reader) Read() (Record, error) { var rec Record err := r.ReadInto(&rec) @@ -144,6 +144,20 @@ func (r *Reader) Read() (Record, error) { // ReadInto is like Read except that it allows reusing Record and associated buffers. func (r *Reader) ReadInto(rec *Record) error { + return r.readLockedPoll(func() error { + return r.ring.readRecord(rec) + }) +} + +// ReadZeroCopy reads the next record from the BPF ringbuf using a zero-copy callback. +// The sample slice is only valid until the callback returns. +func (r *Reader) ReadZeroCopy(f func(sample []byte, remaining int) error) error { + return r.readLockedPoll(func() error { + return r.ring.readRecordZeroCopy(f) + }) +} + +func (r *Reader) readLockedPoll(read func() error) error { r.mu.Lock() defer r.mu.Unlock() @@ -171,7 +185,7 @@ func (r *Reader) ReadInto(rec *Record) error { } for { - err := r.ring.readRecord(rec) + err := read() // Not using errors.Is which is quite a bit slower // For a tight loop it might make a difference if err == errBusy { diff --git a/ringbuf/ring.go b/ringbuf/ring.go index abb704cd5..f1bcc4e3b 100644 --- a/ringbuf/ring.go +++ b/ringbuf/ring.go @@ -43,6 +43,21 @@ func (rr *ringReader) AvailableBytes() uint64 { // Read a record from an event ring. func (rr *ringReader) readRecord(rec *Record) error { + return rr.readRecordZeroCopy(func(sample []byte, remaining int) error { + n := len(sample) + if cap(rec.RawSample) < n { + rec.RawSample = make([]byte, n) + } else { + rec.RawSample = rec.RawSample[:n] + } + copy(rec.RawSample, sample) + rec.Remaining = remaining + + return nil + }) +} + +func (rr *ringReader) readRecordZeroCopy(f func(sample []byte, remaining int) error) error { prod := atomic.LoadUintptr(rr.prod_pos) cons := atomic.LoadUintptr(rr.cons_pos) @@ -87,15 +102,9 @@ func (rr *ringReader) readRecord(rec *Record) error { continue } - if n := header.dataLen(); cap(rec.RawSample) < n { - rec.RawSample = make([]byte, n) - } else { - rec.RawSample = rec.RawSample[:n] - } - - copy(rec.RawSample, rr.ring[start:]) - rec.Remaining = int(prod - cons) + n := header.dataLen() + err := f(rr.ring[start:start+uintptr(n)], int(prod-cons)) atomic.StoreUintptr(rr.cons_pos, cons) - return nil + return err } }