Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ringbuf/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@
// BPF allows submitting custom events to a BPF ring buffer map set up
// by userspace. This is very useful to push things like packet samples
// from BPF to a daemon running in user space.
//
// UnsafeReader provides zero-copy access to samples and is not safe for
// concurrent use.
package ringbuf
48 changes: 48 additions & 0 deletions ringbuf/read_internal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package ringbuf

import (
"errors"
"fmt"
"os"
"time"
)

func readWithPoll(poller poller, ring eventRing, deadline time.Time, haveData *bool, pendingErr *error, read func() error) error {
if ring == nil {
return fmt.Errorf("ringbuffer: %w", ErrClosed)
}

for {
if !*haveData {
if pe := *pendingErr; pe != nil {
*pendingErr = nil
return pe
}

err := poller.Wait(deadline)
if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) {
// Ignoring this for reading a valid entry after timeout or flush.
// This can occur if the producer submitted to the ring buffer
// with BPF_RB_NO_WAKEUP.
*pendingErr = err
} else if err != nil {
return err
}
*haveData = true
}

for {
err := read()
// Not using errors.Is which is quite a bit slower.
// For a tight loop it might make a difference.
if err == errBusy {
continue
}
if err == errEOR {
*haveData = false
break
}
return err
}
}
}
88 changes: 38 additions & 50 deletions ringbuf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ type poller interface {
type eventRing interface {
size() int
AvailableBytes() uint64
readRecord(rec *Record) error
readRecordFunc(func(sample []byte, remaining int, cons uintptr) error) error
commitRecord(cons uintptr)
Close() error
}

Expand Down Expand Up @@ -77,40 +78,49 @@ type Reader struct {

// NewReader creates a new BPF ringbuf reader.
func NewReader(ringbufMap *ebpf.Map) (*Reader, error) {
poller, ring, err := newReaderResources(ringbufMap)
if err != nil {
return nil, err
}

return &Reader{
poller: poller,
ring: ring,
bufferSize: ring.size(),
// On Windows, the wait handle is only set when the reader is created,
// so we miss any wakeups that happened before.
// Do an opportunistic read to get any pending samples.
haveData: platform.IsWindows,
}, nil
}

func newReaderResources(ringbufMap *ebpf.Map) (poller, eventRing, error) {
if ringbufMap.Type() != ebpf.RingBuf && ringbufMap.Type() != ebpf.WindowsRingBuf {
return nil, fmt.Errorf("invalid Map type: %s", ringbufMap.Type())
return nil, nil, fmt.Errorf("invalid Map type: %s", ringbufMap.Type())
}

maxEntries := int(ringbufMap.MaxEntries())
if maxEntries == 0 || (maxEntries&(maxEntries-1)) != 0 {
return nil, fmt.Errorf("ringbuffer map size %d is zero or not a power of two", maxEntries)
return nil, nil, fmt.Errorf("ringbuffer map size %d is zero or not a power of two", maxEntries)
}

poller, err := newPoller(ringbufMap.FD())
if err != nil {
return nil, err
return nil, nil, err
}

ring, err := newRingBufEventRing(ringbufMap.FD(), maxEntries)
if err != nil {
poller.Close()
return nil, fmt.Errorf("failed to create ringbuf ring: %w", err)
return nil, nil, fmt.Errorf("failed to create ringbuf ring: %w", err)
}

return &Reader{
poller: poller,
ring: ring,
bufferSize: ring.size(),
// On Windows, the wait handle is only set when the reader is created,
// so we miss any wakeups that happened before.
// Do an opportunistic read to get any pending samples.
haveData: platform.IsWindows,
}, nil
return poller, ring, nil
}

// Close frees resources used by the reader.
//
// It interrupts calls to Read.
// It interrupts calls to Read and ReadInto.
func (r *Reader) Close() error {
if err := r.poller.Close(); err != nil {
if errors.Is(err, os.ErrClosed) {
Expand Down Expand Up @@ -162,43 +172,21 @@ func (r *Reader) ReadInto(rec *Record) error {
r.mu.Lock()
defer r.mu.Unlock()

if r.ring == nil {
return fmt.Errorf("ringbuffer: %w", ErrClosed)
}

for {
if !r.haveData {
if pe := r.pendingErr; pe != nil {
r.pendingErr = nil
return pe
return readWithPoll(r.poller, r.ring, r.deadline, &r.haveData, &r.pendingErr, func() error {
return r.ring.readRecordFunc(func(sample []byte, remaining int, cons uintptr) error {
n := len(sample)
if cap(rec.RawSample) < n {
rec.RawSample = make([]byte, n)
} else {
rec.RawSample = rec.RawSample[:n]
}

err := r.poller.Wait(r.deadline)
if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) {
// Ignoring this for reading a valid entry after timeout or flush.
// This can occur if the producer submitted to the ring buffer
// with BPF_RB_NO_WAKEUP.
r.pendingErr = err
} else if err != nil {
return err
}
r.haveData = true
}

for {
err := r.ring.readRecord(rec)
// Not using errors.Is which is quite a bit slower
// For a tight loop it might make a difference
if err == errBusy {
continue
}
if err == errEOR {
r.haveData = false
break
}
return err
}
}
copy(rec.RawSample, sample)
rec.Remaining = remaining
r.ring.commitRecord(cons)
return nil
})
})
}

// BufferSize returns the size in bytes of the ring buffer
Expand Down
22 changes: 9 additions & 13 deletions ringbuf/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ func (rr *ringReader) AvailableBytes() uint64 {
return uint64(prod - cons)
}

// Read a record from an event ring.
func (rr *ringReader) readRecord(rec *Record) error {
func (rr *ringReader) commitRecord(cons uintptr) {
atomic.StoreUintptr(rr.cons_pos, cons)
}

// Read a record from an event ring and invoke f with a zero-copy sample view.
func (rr *ringReader) readRecordFunc(f func(sample []byte, remaining int, cons uintptr) error) error {
prod := atomic.LoadUintptr(rr.prod_pos)
cons := atomic.LoadUintptr(rr.cons_pos)

Expand Down Expand Up @@ -83,19 +87,11 @@ func (rr *ringReader) readRecord(rec *Record) error {
// when the record header indicates that the data should be
// discarded, we skip it by just updating the consumer position
// to the next record.
atomic.StoreUintptr(rr.cons_pos, cons)
rr.commitRecord(cons)
continue
}

if n := header.dataLen(); cap(rec.RawSample) < n {
rec.RawSample = make([]byte, n)
} else {
rec.RawSample = rec.RawSample[:n]
}

copy(rec.RawSample, rr.ring[start:])
rec.Remaining = int(prod - cons)
atomic.StoreUintptr(rr.cons_pos, cons)
return nil
n := header.dataLen()
return f(rr.ring[start:start+uintptr(n)], int(prod-cons), cons)
}
}
170 changes: 170 additions & 0 deletions ringbuf/unsafe_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package ringbuf

import (
"errors"
"os"
"time"

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/internal/platform"
)

// UnsafeReader allows reading ringbuf records without copying sample data.
//
// UnsafeReader is not safe for concurrent use.
//
// Samples returned from Read / ReadInto remain valid only until Commit is called.
// Samples passed to ReadFunc are valid only during callback execution.
type UnsafeReader struct {
poller poller
ring eventRing

haveData bool
deadline time.Time
bufferSize int
pendingErr error

pendingCons uintptr
pendingRead bool
}

// NewUnsafeReader creates a new BPF ringbuf reader exposing zero-copy APIs.
func NewUnsafeReader(ringbufMap *ebpf.Map) (*UnsafeReader, error) {
poller, ring, err := newReaderResources(ringbufMap)
if err != nil {
return nil, err
}

return &UnsafeReader{
poller: poller,
ring: ring,
bufferSize: ring.size(),
// On Windows, the wait handle is only set when the reader is created,
// so we miss any wakeups that happened before.
// Do an opportunistic read to get any pending samples.
haveData: platform.IsWindows,
}, nil
}

// SetDeadline controls how long Read, ReadInto and ReadFunc will block waiting for samples.
//
// Passing a zero time.Time will remove the deadline.
func (r *UnsafeReader) SetDeadline(t time.Time) {
r.deadline = t
}

// Read the next record from the BPF ringbuf.
//
// Calling [UnsafeReader.Close] interrupts the method with [os.ErrClosed].
// Calling [UnsafeReader.Flush] makes it return all records currently in the ring
// buffer, followed by [ErrFlushed].
//
// Returns [os.ErrDeadlineExceeded] if a deadline was set and after all records
// have been read from the ring.
//
// The returned sample aliases the ring buffer and remains valid until Commit is
// called.
func (r *UnsafeReader) Read() (Record, error) {
var rec Record
err := r.ReadInto(&rec)
return rec, err
}

// ReadInto is like Read except that it allows reusing Record.
//
// ReadInto does not copy sample bytes. rec.RawSample aliases ring buffer memory
// and remains valid until Commit is called.
func (r *UnsafeReader) ReadInto(rec *Record) error {
if r.pendingRead {
return errors.New("ringbuffer: previous record must be committed")
}

return readWithPoll(r.poller, r.ring, r.deadline, &r.haveData, &r.pendingErr, func() error {
return r.ring.readRecordFunc(func(sample []byte, remaining int, cons uintptr) error {
rec.RawSample = sample
rec.Remaining = remaining
r.pendingRead = true
r.pendingCons = cons
return nil
})
})
}

// ReadFunc reads and processes one record via callback.
//
// The callback receives a sample view into ring buffer memory, which is valid
// only for the duration of the callback. The consumed record is committed even
// if the callback returns an error.
//
// The returned value is the minimum bytes remaining in the ring buffer after
// this record has been consumed.
func (r *UnsafeReader) ReadFunc(f func(sample []byte, remaining int) error) (int, error) {
if r.pendingRead {
return 0, errors.New("ringbuffer: previous record must be committed")
}

var (
rec Record
err error
)

err = readWithPoll(r.poller, r.ring, r.deadline, &r.haveData, &r.pendingErr, func() error {
return r.ring.readRecordFunc(func(sample []byte, remaining int, cons uintptr) error {
defer r.ring.commitRecord(cons)

callErr := f(sample, remaining)
rec.Remaining = remaining
return callErr
})
})
return rec.Remaining, err
}

// Commit advances the reader past the most recently read record.
func (r *UnsafeReader) Commit() error {
if !r.pendingRead {
return errors.New("ringbuffer: no pending record to commit")
}

r.ring.commitRecord(r.pendingCons)
r.pendingCons = 0
r.pendingRead = false
return nil
}

// Close frees resources used by the reader.
//
// It interrupts calls to Read, ReadInto and ReadFunc.
func (r *UnsafeReader) Close() error {
if err := r.poller.Close(); err != nil {
if errors.Is(err, os.ErrClosed) {
return nil
}
return err
}

var err error
if r.ring != nil {
err = r.ring.Close()
r.ring = nil
}

return err
}

// BufferSize returns the size in bytes of the ring buffer.
func (r *UnsafeReader) BufferSize() int {
return r.bufferSize
}

// Flush unblocks Read/ReadInto/ReadFunc and successive Read/ReadInto/ReadFunc calls return pending
// samples at this point, until ErrFlushed is returned.
func (r *UnsafeReader) Flush() error {
return r.poller.Flush()
}

// AvailableBytes returns the amount of data available to read in the ring
// buffer in bytes.
func (r *UnsafeReader) AvailableBytes() int {
return int(r.ring.AvailableBytes())
}
Loading
Loading