Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion ringbuf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type eventRing interface {
size() int
AvailableBytes() uint64
readRecord(rec *Record) error
readRecordUnsafe(rec *Record) error
advance()
Close() error
}

Expand Down Expand Up @@ -59,6 +61,10 @@ type Record struct {

// The minimum number of bytes remaining in the ring buffer after this Record has been read.
Remaining int

// Set by readRecordZeroCopy to prevent readRecord from reusing
// RawSample as a write destination (it points into read-only mmap memory).
isReadOnly bool
}

// Reader allows reading bpf_ringbuf_output
Expand Down Expand Up @@ -159,6 +165,40 @@ func (r *Reader) Read() (Record, error) {

// ReadInto is like Read except that it allows reusing Record and associated buffers.
func (r *Reader) ReadInto(rec *Record) error {
return r.readWait(func() error {
return r.ring.readRecord(rec)
})
}

// Like [Reader.ReadInto], but returns a zero-copy slice into the ring buffer's
// memory-mapped region instead of copying. The slice is valid until [Reader.Commit].
//
// RawSample points into read-only memory and must not be written to. If the
// Record is later passed to [Reader.ReadInto], the existing buffer will not be
// reused for the same reason; a new allocation will occur instead.
//
// Does not advance the consumer position. Call [Reader.Commit] after processing
// to release space. Must not be mixed with [Reader.Read] or [Reader.ReadInto]
// between Commit calls, or an ErrNotCommitted error will be returned.
func (r *Reader) ReadUnsafe(rec *Record) error {
return r.readWait(func() error {
return r.ring.readRecordUnsafe(rec)
})
}

// Advances the consumer position, releasing ring buffer space from preceding
// [Reader.ReadUnsafe] calls. Slices from ReadUnsafe are invalid after this.
// No-op if there are no pending reads.
func (r *Reader) Commit() {
r.mu.Lock()
defer r.mu.Unlock()

if r.ring != nil {
r.ring.advance()
}
}

func (r *Reader) readWait(read func() error) error {
r.mu.Lock()
defer r.mu.Unlock()

Expand Down Expand Up @@ -186,7 +226,7 @@ func (r *Reader) ReadInto(rec *Record) error {
}

for {
err := r.ring.readRecord(rec)
err := read()
// Not using errors.Is which is quite a bit slower
// For a tight loop it might make a difference
if err == errBusy {
Expand Down
159 changes: 159 additions & 0 deletions ringbuf/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,165 @@ func TestReadAfterClose(t *testing.T) {
}
}

func TestReadZeroCopySingle(t *testing.T) {
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")

prog, events := mustOutputSamplesProg(t, sampleMessage{size: 5})

rd, err := NewReader(events)
qt.Assert(t, qt.IsNil(err))
defer rd.Close()

mustRun(t, prog)

var rec Record
err = rd.ReadUnsafe(&rec)
qt.Assert(t, qt.IsNil(err))
qt.Assert(t, qt.Equals(len(rec.RawSample), 5))
qt.Assert(t, qt.DeepEquals(rec.RawSample, []byte{1, 2, 3, 4, 4}))

rd.Commit()
qt.Assert(t, qt.Equals(rd.AvailableBytes(), 0))
}

func TestReadZeroCopyMulti(t *testing.T) {
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")

prog, events := mustOutputSamplesProg(t,
sampleMessage{size: 5},
sampleMessage{size: 10, discard: true},
sampleMessage{size: 15},
)

rd, err := NewReader(events)
qt.Assert(t, qt.IsNil(err))
defer rd.Close()

mustRun(t, prog)

var rec Record

err = rd.ReadUnsafe(&rec)
qt.Assert(t, qt.IsNil(err))
qt.Assert(t, qt.Equals(len(rec.RawSample), 5))

err = rd.ReadUnsafe(&rec)
qt.Assert(t, qt.IsNil(err))
qt.Assert(t, qt.Equals(len(rec.RawSample), 15))

rd.Commit()
qt.Assert(t, qt.Equals(rd.AvailableBytes(), 0))
}

func TestReadZeroCopyCommitReleasesSpace(t *testing.T) {
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")

prog, events := mustOutputSamplesProg(t, sampleMessage{size: 5})

rd, err := NewReader(events)
qt.Assert(t, qt.IsNil(err))
defer rd.Close()

mustRun(t, prog)

var rec Record
err = rd.ReadUnsafe(&rec)
qt.Assert(t, qt.IsNil(err))

qt.Assert(t, qt.Not(qt.Equals(rd.AvailableBytes(), 0)))

rd.Commit()
qt.Assert(t, qt.Equals(rd.AvailableBytes(), 0))

mustRun(t, prog)
err = rd.ReadUnsafe(&rec)
qt.Assert(t, qt.IsNil(err))
qt.Assert(t, qt.Equals(len(rec.RawSample), 5))
rd.Commit()
}

func TestReadZeroCopyDeadline(t *testing.T) {
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")

_, events := mustOutputSamplesProg(t, sampleMessage{size: 5})

rd, err := NewReader(events)
qt.Assert(t, qt.IsNil(err))
defer rd.Close()

rd.SetDeadline(time.Now().Add(-time.Second))

var rec Record
err = rd.ReadUnsafe(&rec)
qt.Assert(t, qt.ErrorIs(err, os.ErrDeadlineExceeded))
}

func TestReadIntoRejectsUncommittedZeroCopy(t *testing.T) {
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")

prog, events := mustOutputSamplesProg(t, sampleMessage{size: 5})

rd, err := NewReader(events)
qt.Assert(t, qt.IsNil(err))
defer rd.Close()

mustRun(t, prog)

var rec Record
err = rd.ReadUnsafe(&rec)
qt.Assert(t, qt.IsNil(err))

err = rd.ReadInto(&rec)
qt.Assert(t, qt.ErrorIs(err, ErrNotCommitted))

_, err = rd.Read()
qt.Assert(t, qt.ErrorIs(err, ErrNotCommitted))

rd.Commit()

mustRun(t, prog)
err = rd.ReadInto(&rec)
qt.Assert(t, qt.IsNil(err))
}

func TestCommitNoOp(t *testing.T) {
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")

_, events := mustOutputSamplesProg(t, sampleMessage{size: 5})

rd, err := NewReader(events)
qt.Assert(t, qt.IsNil(err))
defer rd.Close()

rd.Commit()
}

func BenchmarkReadZeroCopy(b *testing.B) {
testutils.SkipOnOldKernel(b, "5.8", "BPF ring buffer")

prog, events := mustOutputSamplesProg(b, sampleMessage{size: 80, flags: 0})

rd, err := NewReader(events)
if err != nil {
b.Fatal(err)
}
defer rd.Close()

b.ReportAllocs()

var rec Record
for b.Loop() {
b.StopTimer()
mustRun(b, prog)
b.StartTimer()

if err := rd.ReadUnsafe(&rec); err != nil {
b.Fatal("Can't read samples:", err)
}
rd.Commit()
}
}

func BenchmarkReader(b *testing.B) {
testutils.SkipOnOldKernel(b, "5.8", "BPF ring buffer")

Expand Down
77 changes: 65 additions & 12 deletions ringbuf/ring.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ringbuf

import (
"errors"
"fmt"
"io"
"sync/atomic"
Expand All @@ -10,11 +11,18 @@ import (
"github.com/cilium/ebpf/internal/sys"
)

var ErrNotCommitted = errors.New("zero-copy records not yet committed")

type ringReader struct {
// These point into mmap'ed memory and must be accessed atomically.
prod_pos, cons_pos *uintptr
mask uintptr
ring []byte

// Logical consumer position for deferred zero-copy reads.
// Only valid when hasPending is true.
pendingCons uintptr
hasPending bool
}

func newRingReader(cons_ptr, prod_ptr *uintptr, ring []byte) *ringReader {
Expand All @@ -41,10 +49,49 @@ func (rr *ringReader) AvailableBytes() uint64 {
return uint64(prod - cons)
}

// Read a record from an event ring.
// Like readRecordZeroCopy, but copies data into rec.RawSample and advances
// the consumer position immediately.
func (rr *ringReader) readRecord(rec *Record) error {
if rr.hasPending {
return ErrNotCommitted
}

buf := rec.RawSample
if rec.isReadOnly {
buf = nil
}

defer func() {
rec.isReadOnly = false
rr.advance()
}()

err := rr.readRecordUnsafe(rec)
if err != nil {
return err
}

n := len(rec.RawSample)
if cap(buf) < n {
buf = make([]byte, n)
} else {
buf = buf[:n]
}
copy(buf, rec.RawSample)
rec.RawSample = buf

return nil
}

// Sets rec.RawSample to a slice of the mmap'd ring buffer memory.
// Does not advance the consumer position; call advance separately.
func (rr *ringReader) readRecordUnsafe(rec *Record) error {
prod := atomic.LoadUintptr(rr.prod_pos)
cons := atomic.LoadUintptr(rr.cons_pos)

cons := rr.pendingCons
if !rr.hasPending {
cons = atomic.LoadUintptr(rr.cons_pos)
}

for {
if remaining := prod - cons; remaining == 0 {
Expand Down Expand Up @@ -81,21 +128,27 @@ func (rr *ringReader) readRecord(rec *Record) error {

if header.isDiscard() {
// when the record header indicates that the data should be
// discarded, we skip it by just updating the consumer position
// discarded, we skip it by just updating the pending position
// to the next record.
atomic.StoreUintptr(rr.cons_pos, cons)
rr.pendingCons = cons
rr.hasPending = true
continue
}

if n := header.dataLen(); cap(rec.RawSample) < n {
rec.RawSample = make([]byte, n)
} else {
rec.RawSample = rec.RawSample[:n]
}

copy(rec.RawSample, rr.ring[start:])
n := header.dataLen()
rec.RawSample = rr.ring[start : start+uintptr(n)]
rec.Remaining = int(prod - cons)
atomic.StoreUintptr(rr.cons_pos, cons)
rec.isReadOnly = true
rr.pendingCons = cons
rr.hasPending = true
return nil
}
}

// Commits the pending consumer position from readRecordZeroCopy calls.
func (rr *ringReader) advance() {
if rr.hasPending {
atomic.StoreUintptr(rr.cons_pos, rr.pendingCons)
rr.hasPending = false
}
}
8 changes: 8 additions & 0 deletions ringbuf/ring_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ func newRingBufEventRing(mapFD, size int) (*mmapEventRing, error) {
return ring, nil
}

func (ring *mmapEventRing) readRecordUnsafe(rec *Record) error {
return ring.ringReader.readRecordUnsafe(rec)
}

func (ring *mmapEventRing) advance() {
ring.ringReader.advance()
}

func (ring *mmapEventRing) Close() error {
ring.cleanup.Stop()

Expand Down
8 changes: 8 additions & 0 deletions ringbuf/ring_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ func newRingBufEventRing(mapFD, size int) (*windowsEventRing, error) {
return ring, nil
}

func (ring *windowsEventRing) readRecordUnsafe(rec *Record) error {
return ring.ringReader.readRecordUnsafe(rec)
}

func (ring *windowsEventRing) advance() {
ring.ringReader.advance()
}

func (ring *windowsEventRing) Close() error {
ring.cleanup.Stop()

Expand Down
Loading