Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion ringbuf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type eventRing interface {
size() int
AvailableBytes() uint64
readRecord(rec *Record) error
readRecordUnsafe(rec *Record) error
advance()
Close() error
}

Expand Down Expand Up @@ -59,6 +61,10 @@ type Record struct {

// The minimum number of bytes remaining in the ring buffer after this Record has been read.
Remaining int

// Set by readRecordZeroCopy to prevent readRecord from reusing
// RawSample as a write destination (it points into read-only mmap memory).
isReadOnly bool
}

// Reader allows reading bpf_ringbuf_output
Expand Down Expand Up @@ -159,6 +165,40 @@ func (r *Reader) Read() (Record, error) {

// ReadInto is like Read except that it allows reusing Record and associated buffers.
func (r *Reader) ReadInto(rec *Record) error {
return r.readWait(func() error {
return r.ring.readRecord(rec)
})
}

// Like [Reader.ReadInto], but returns a zero-copy slice into the ring buffer's
// memory-mapped region instead of copying. The slice is valid until [Reader.Commit].
//
// RawSample points into read-only memory and must not be written to. If the
// Record is later passed to [Reader.ReadInto], the existing buffer will not be
// reused for the same reason; a new allocation will occur instead.
//
// Does not advance the consumer position. Call [Reader.Commit] after processing
// to release space. Must not be mixed with [Reader.Read] or [Reader.ReadInto]
// between Commit calls, or an ErrNotCommitted error will be returned.
func (r *Reader) ReadUnsafe(rec *Record) error {
return r.readWait(func() error {
return r.ring.readRecordUnsafe(rec)
})
}

// Advances the consumer position, releasing ring buffer space from preceding
// [Reader.ReadUnsafe] calls. Slices from ReadUnsafe are invalid after this.
// No-op if there are no pending reads.
func (r *Reader) Commit() {
r.mu.Lock()
defer r.mu.Unlock()

if r.ring != nil {
r.ring.advance()
}
}

func (r *Reader) readWait(read func() error) error {
r.mu.Lock()
defer r.mu.Unlock()

Expand Down Expand Up @@ -186,7 +226,7 @@ func (r *Reader) ReadInto(rec *Record) error {
}

for {
err := r.ring.readRecord(rec)
err := read()
// Not using errors.Is which is quite a bit slower
// For a tight loop it might make a difference
if err == errBusy {
Expand Down
159 changes: 159 additions & 0 deletions ringbuf/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,165 @@ func TestReadAfterClose(t *testing.T) {
}
}

func TestReadZeroCopySingle(t *testing.T) {
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")

prog, events := mustOutputSamplesProg(t, sampleMessage{size: 5})

rd, err := NewReader(events)
qt.Assert(t, qt.IsNil(err))
defer rd.Close()

mustRun(t, prog)

var rec Record
err = rd.ReadUnsafe(&rec)
qt.Assert(t, qt.IsNil(err))
qt.Assert(t, qt.Equals(len(rec.RawSample), 5))
qt.Assert(t, qt.DeepEquals(rec.RawSample, []byte{1, 2, 3, 4, 4}))

rd.Commit()
qt.Assert(t, qt.Equals(rd.AvailableBytes(), 0))
}

func TestReadZeroCopyMulti(t *testing.T) {
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")

prog, events := mustOutputSamplesProg(t,
sampleMessage{size: 5},
sampleMessage{size: 10, discard: true},
sampleMessage{size: 15},
)

rd, err := NewReader(events)
qt.Assert(t, qt.IsNil(err))
defer rd.Close()

mustRun(t, prog)

var rec Record

err = rd.ReadUnsafe(&rec)
qt.Assert(t, qt.IsNil(err))
qt.Assert(t, qt.Equals(len(rec.RawSample), 5))

err = rd.ReadUnsafe(&rec)
qt.Assert(t, qt.IsNil(err))
qt.Assert(t, qt.Equals(len(rec.RawSample), 15))

rd.Commit()
qt.Assert(t, qt.Equals(rd.AvailableBytes(), 0))
}

func TestReadZeroCopyCommitReleasesSpace(t *testing.T) {
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")

prog, events := mustOutputSamplesProg(t, sampleMessage{size: 5})

rd, err := NewReader(events)
qt.Assert(t, qt.IsNil(err))
defer rd.Close()

mustRun(t, prog)

var rec Record
err = rd.ReadUnsafe(&rec)
qt.Assert(t, qt.IsNil(err))

qt.Assert(t, qt.Not(qt.Equals(rd.AvailableBytes(), 0)))

rd.Commit()
qt.Assert(t, qt.Equals(rd.AvailableBytes(), 0))

mustRun(t, prog)
err = rd.ReadUnsafe(&rec)
qt.Assert(t, qt.IsNil(err))
qt.Assert(t, qt.Equals(len(rec.RawSample), 5))
rd.Commit()
}

func TestReadZeroCopyDeadline(t *testing.T) {
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")

_, events := mustOutputSamplesProg(t, sampleMessage{size: 5})

rd, err := NewReader(events)
qt.Assert(t, qt.IsNil(err))
defer rd.Close()

rd.SetDeadline(time.Now().Add(-time.Second))

var rec Record
err = rd.ReadUnsafe(&rec)
qt.Assert(t, qt.ErrorIs(err, os.ErrDeadlineExceeded))
}

func TestReadIntoRejectsUncommittedZeroCopy(t *testing.T) {
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")

prog, events := mustOutputSamplesProg(t, sampleMessage{size: 5})

rd, err := NewReader(events)
qt.Assert(t, qt.IsNil(err))
defer rd.Close()

mustRun(t, prog)

var rec Record
err = rd.ReadUnsafe(&rec)
qt.Assert(t, qt.IsNil(err))

err = rd.ReadInto(&rec)
qt.Assert(t, qt.ErrorIs(err, ErrNotCommitted))

_, err = rd.Read()
qt.Assert(t, qt.ErrorIs(err, ErrNotCommitted))

rd.Commit()

mustRun(t, prog)
err = rd.ReadInto(&rec)
qt.Assert(t, qt.IsNil(err))
}

func TestCommitNoOp(t *testing.T) {
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")

_, events := mustOutputSamplesProg(t, sampleMessage{size: 5})

rd, err := NewReader(events)
qt.Assert(t, qt.IsNil(err))
defer rd.Close()

rd.Commit()
}

func BenchmarkReadZeroCopy(b *testing.B) {
testutils.SkipOnOldKernel(b, "5.8", "BPF ring buffer")

prog, events := mustOutputSamplesProg(b, sampleMessage{size: 80, flags: 0})

rd, err := NewReader(events)
if err != nil {
b.Fatal(err)
}
defer rd.Close()

b.ReportAllocs()

var rec Record
for b.Loop() {
b.StopTimer()
mustRun(b, prog)
b.StartTimer()

if err := rd.ReadUnsafe(&rec); err != nil {
b.Fatal("Can't read samples:", err)
}
rd.Commit()
}
}

func BenchmarkReader(b *testing.B) {
testutils.SkipOnOldKernel(b, "5.8", "BPF ring buffer")

Expand Down
77 changes: 65 additions & 12 deletions ringbuf/ring.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ringbuf

import (
"errors"
"fmt"
"io"
"sync/atomic"
Expand All @@ -10,11 +11,18 @@ import (
"github.com/cilium/ebpf/internal/sys"
)

var ErrNotCommitted = errors.New("zero-copy records not yet committed")

type ringReader struct {
// These point into mmap'ed memory and must be accessed atomically.
prod_pos, cons_pos *uintptr
mask uintptr
ring []byte

// Logical consumer position for deferred zero-copy reads.
// Only valid when hasPending is true.
pendingCons uintptr
hasPending bool
}

func newRingReader(cons_ptr, prod_ptr *uintptr, ring []byte) *ringReader {
Expand All @@ -41,10 +49,49 @@ func (rr *ringReader) AvailableBytes() uint64 {
return uint64(prod - cons)
}

// Read a record from an event ring.
// Like readRecordZeroCopy, but copies data into rec.RawSample and advances
// the consumer position immediately.
func (rr *ringReader) readRecord(rec *Record) error {
if rr.hasPending {
return ErrNotCommitted
}

buf := rec.RawSample
if rec.isReadOnly {
buf = nil
}

defer func() {
rec.isReadOnly = false
rr.advance()
}()

err := rr.readRecordUnsafe(rec)
if err != nil {
return err
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we get an error here and don't call rr.advance() don't we get stuck? What is the idea for recovery?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point - an error after isDiscard=true records will not advance. fixing

}

n := len(rec.RawSample)
if cap(buf) < n {
buf = make([]byte, n)
} else {
buf = buf[:n]
}
copy(buf, rec.RawSample)
rec.RawSample = buf

return nil
}

// Sets rec.RawSample to a slice of the mmap'd ring buffer memory.
// Does not advance the consumer position; call advance separately.
func (rr *ringReader) readRecordUnsafe(rec *Record) error {
prod := atomic.LoadUintptr(rr.prod_pos)
cons := atomic.LoadUintptr(rr.cons_pos)

cons := rr.pendingCons
if !rr.hasPending {
cons = atomic.LoadUintptr(rr.cons_pos)
}

for {
if remaining := prod - cons; remaining == 0 {
Expand Down Expand Up @@ -81,21 +128,27 @@ func (rr *ringReader) readRecord(rec *Record) error {

if header.isDiscard() {
// when the record header indicates that the data should be
// discarded, we skip it by just updating the consumer position
// discarded, we skip it by just updating the pending position
// to the next record.
atomic.StoreUintptr(rr.cons_pos, cons)
rr.pendingCons = cons
rr.hasPending = true
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What clears hasPending if we never reach a non-discard record?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the event that all records in readRecordZeroCopy are isDiscard=true, advance must stil be called by the user. the user's readWait still waits for a record, so it should still get it eventually and the user may successsfully get a non-discard record.
However, even if an error is returned to the user - for example a timeout - Commit must still be called. This is a very important detail that must be added to the documentation.

At first when I read your comment I wanted to address it by simply advancing the ring buffer for isDiscard=true records and for errors, but this can't be done, because there may be user pointer to the internal ring buffer, which wasn't freed (Commit wasn't called).

continue
}

if n := header.dataLen(); cap(rec.RawSample) < n {
rec.RawSample = make([]byte, n)
} else {
rec.RawSample = rec.RawSample[:n]
}

copy(rec.RawSample, rr.ring[start:])
n := header.dataLen()
rec.RawSample = rr.ring[start : start+uintptr(n)]
rec.Remaining = int(prod - cons)
atomic.StoreUintptr(rr.cons_pos, cons)
rec.isReadOnly = true
rr.pendingCons = cons
rr.hasPending = true
return nil
}
}

// Commits the pending consumer position from readRecordZeroCopy calls.
func (rr *ringReader) advance() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After atomic.StoreUintptr() returns, the Kernel may overwrite the mmap'd region immediately. Any RawSample slice handed out by ReadZeroCopy is now a dangling view into potentially live Kernel memory. The documentation warns callers, but nothing prevents silent use-after-commit — Go's GC keeps the mapping alive while rec.RawSample is reachable, so there's no crash, just corrupt data.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes :)
This will be a problem in every implementation of such a feature.
This may be "hidden" because of the function name. May be ReadZeroCopy should be ReadUnsafe?

if rr.hasPending {
atomic.StoreUintptr(rr.cons_pos, rr.pendingCons)
rr.hasPending = false
}
}
8 changes: 8 additions & 0 deletions ringbuf/ring_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ func newRingBufEventRing(mapFD, size int) (*mmapEventRing, error) {
return ring, nil
}

func (ring *mmapEventRing) readRecordUnsafe(rec *Record) error {
return ring.ringReader.readRecordUnsafe(rec)
}

func (ring *mmapEventRing) advance() {
ring.ringReader.advance()
}

func (ring *mmapEventRing) Close() error {
ring.cleanup.Stop()

Expand Down
8 changes: 8 additions & 0 deletions ringbuf/ring_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ func newRingBufEventRing(mapFD, size int) (*windowsEventRing, error) {
return ring, nil
}

func (ring *windowsEventRing) readRecordUnsafe(rec *Record) error {
return ring.ringReader.readRecordUnsafe(rec)
}

func (ring *windowsEventRing) advance() {
ring.ringReader.advance()
}

func (ring *windowsEventRing) Close() error {
ring.cleanup.Stop()

Expand Down
Loading