diff --git a/ringbuf/doc.go b/ringbuf/doc.go index eb9617e63..3988886c5 100644 --- a/ringbuf/doc.go +++ b/ringbuf/doc.go @@ -3,4 +3,7 @@ // BPF allows submitting custom events to a BPF ring buffer map set up // by userspace. This is very useful to push things like packet samples // from BPF to a daemon running in user space. +// +// UnsafeReader provides zero-copy access to samples and is not safe for +// concurrent use. package ringbuf diff --git a/ringbuf/read_internal.go b/ringbuf/read_internal.go new file mode 100644 index 000000000..5ee849d4d --- /dev/null +++ b/ringbuf/read_internal.go @@ -0,0 +1,48 @@ +package ringbuf + +import ( + "errors" + "fmt" + "os" + "time" +) + +func readWithPoll(poller poller, ring eventRing, deadline time.Time, haveData *bool, pendingErr *error, read func() error) error { + if ring == nil { + return fmt.Errorf("ringbuffer: %w", ErrClosed) + } + + for { + if !*haveData { + if pe := *pendingErr; pe != nil { + *pendingErr = nil + return pe + } + + err := poller.Wait(deadline) + if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) { + // Ignoring this for reading a valid entry after timeout or flush. + // This can occur if the producer submitted to the ring buffer + // with BPF_RB_NO_WAKEUP. + *pendingErr = err + } else if err != nil { + return err + } + *haveData = true + } + + for { + err := read() + // Not using errors.Is which is quite a bit slower. + // For a tight loop it might make a difference. + if err == errBusy { + continue + } + if err == errEOR { + *haveData = false + break + } + return err + } + } +} diff --git a/ringbuf/reader.go b/ringbuf/reader.go index d1483d78a..e2a54c093 100644 --- a/ringbuf/reader.go +++ b/ringbuf/reader.go @@ -30,7 +30,8 @@ type poller interface { type eventRing interface { size() int AvailableBytes() uint64 - readRecord(rec *Record) error + readRecordFunc(func(sample []byte, remaining int, cons uintptr) error) error + commitRecord(cons uintptr) Close() error } @@ -77,40 +78,49 @@ type Reader struct { // NewReader creates a new BPF ringbuf reader. func NewReader(ringbufMap *ebpf.Map) (*Reader, error) { + poller, ring, err := newReaderResources(ringbufMap) + if err != nil { + return nil, err + } + + return &Reader{ + poller: poller, + ring: ring, + bufferSize: ring.size(), + // On Windows, the wait handle is only set when the reader is created, + // so we miss any wakeups that happened before. + // Do an opportunistic read to get any pending samples. + haveData: platform.IsWindows, + }, nil +} + +func newReaderResources(ringbufMap *ebpf.Map) (poller, eventRing, error) { if ringbufMap.Type() != ebpf.RingBuf && ringbufMap.Type() != ebpf.WindowsRingBuf { - return nil, fmt.Errorf("invalid Map type: %s", ringbufMap.Type()) + return nil, nil, fmt.Errorf("invalid Map type: %s", ringbufMap.Type()) } maxEntries := int(ringbufMap.MaxEntries()) if maxEntries == 0 || (maxEntries&(maxEntries-1)) != 0 { - return nil, fmt.Errorf("ringbuffer map size %d is zero or not a power of two", maxEntries) + return nil, nil, fmt.Errorf("ringbuffer map size %d is zero or not a power of two", maxEntries) } poller, err := newPoller(ringbufMap.FD()) if err != nil { - return nil, err + return nil, nil, err } ring, err := newRingBufEventRing(ringbufMap.FD(), maxEntries) if err != nil { poller.Close() - return nil, fmt.Errorf("failed to create ringbuf ring: %w", err) + return nil, nil, fmt.Errorf("failed to create ringbuf ring: %w", err) } - return &Reader{ - poller: poller, - ring: ring, - bufferSize: ring.size(), - // On Windows, the wait handle is only set when the reader is created, - // so we miss any wakeups that happened before. - // Do an opportunistic read to get any pending samples. - haveData: platform.IsWindows, - }, nil + return poller, ring, nil } // Close frees resources used by the reader. // -// It interrupts calls to Read. +// It interrupts calls to Read and ReadInto. func (r *Reader) Close() error { if err := r.poller.Close(); err != nil { if errors.Is(err, os.ErrClosed) { @@ -162,43 +172,21 @@ func (r *Reader) ReadInto(rec *Record) error { r.mu.Lock() defer r.mu.Unlock() - if r.ring == nil { - return fmt.Errorf("ringbuffer: %w", ErrClosed) - } - - for { - if !r.haveData { - if pe := r.pendingErr; pe != nil { - r.pendingErr = nil - return pe + return readWithPoll(r.poller, r.ring, r.deadline, &r.haveData, &r.pendingErr, func() error { + return r.ring.readRecordFunc(func(sample []byte, remaining int, cons uintptr) error { + n := len(sample) + if cap(rec.RawSample) < n { + rec.RawSample = make([]byte, n) + } else { + rec.RawSample = rec.RawSample[:n] } - err := r.poller.Wait(r.deadline) - if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) { - // Ignoring this for reading a valid entry after timeout or flush. - // This can occur if the producer submitted to the ring buffer - // with BPF_RB_NO_WAKEUP. - r.pendingErr = err - } else if err != nil { - return err - } - r.haveData = true - } - - for { - err := r.ring.readRecord(rec) - // Not using errors.Is which is quite a bit slower - // For a tight loop it might make a difference - if err == errBusy { - continue - } - if err == errEOR { - r.haveData = false - break - } - return err - } - } + copy(rec.RawSample, sample) + rec.Remaining = remaining + r.ring.commitRecord(cons) + return nil + }) + }) } // BufferSize returns the size in bytes of the ring buffer diff --git a/ringbuf/ring.go b/ringbuf/ring.go index abb704cd5..dd89338d2 100644 --- a/ringbuf/ring.go +++ b/ringbuf/ring.go @@ -41,8 +41,12 @@ func (rr *ringReader) AvailableBytes() uint64 { return uint64(prod - cons) } -// Read a record from an event ring. -func (rr *ringReader) readRecord(rec *Record) error { +func (rr *ringReader) commitRecord(cons uintptr) { + atomic.StoreUintptr(rr.cons_pos, cons) +} + +// Read a record from an event ring and invoke f with a zero-copy sample view. +func (rr *ringReader) readRecordFunc(f func(sample []byte, remaining int, cons uintptr) error) error { prod := atomic.LoadUintptr(rr.prod_pos) cons := atomic.LoadUintptr(rr.cons_pos) @@ -83,19 +87,11 @@ func (rr *ringReader) readRecord(rec *Record) error { // when the record header indicates that the data should be // discarded, we skip it by just updating the consumer position // to the next record. - atomic.StoreUintptr(rr.cons_pos, cons) + rr.commitRecord(cons) continue } - if n := header.dataLen(); cap(rec.RawSample) < n { - rec.RawSample = make([]byte, n) - } else { - rec.RawSample = rec.RawSample[:n] - } - - copy(rec.RawSample, rr.ring[start:]) - rec.Remaining = int(prod - cons) - atomic.StoreUintptr(rr.cons_pos, cons) - return nil + n := header.dataLen() + return f(rr.ring[start:start+uintptr(n)], int(prod-cons), cons) } } diff --git a/ringbuf/unsafe_reader.go b/ringbuf/unsafe_reader.go new file mode 100644 index 000000000..c5b1c9833 --- /dev/null +++ b/ringbuf/unsafe_reader.go @@ -0,0 +1,170 @@ +package ringbuf + +import ( + "errors" + "os" + "time" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/internal/platform" +) + +// UnsafeReader allows reading ringbuf records without copying sample data. +// +// UnsafeReader is not safe for concurrent use. +// +// Samples returned from Read / ReadInto remain valid only until Commit is called. +// Samples passed to ReadFunc are valid only during callback execution. +type UnsafeReader struct { + poller poller + ring eventRing + + haveData bool + deadline time.Time + bufferSize int + pendingErr error + + pendingCons uintptr + pendingRead bool +} + +// NewUnsafeReader creates a new BPF ringbuf reader exposing zero-copy APIs. +func NewUnsafeReader(ringbufMap *ebpf.Map) (*UnsafeReader, error) { + poller, ring, err := newReaderResources(ringbufMap) + if err != nil { + return nil, err + } + + return &UnsafeReader{ + poller: poller, + ring: ring, + bufferSize: ring.size(), + // On Windows, the wait handle is only set when the reader is created, + // so we miss any wakeups that happened before. + // Do an opportunistic read to get any pending samples. + haveData: platform.IsWindows, + }, nil +} + +// SetDeadline controls how long Read, ReadInto and ReadFunc will block waiting for samples. +// +// Passing a zero time.Time will remove the deadline. +func (r *UnsafeReader) SetDeadline(t time.Time) { + r.deadline = t +} + +// Read the next record from the BPF ringbuf. +// +// Calling [UnsafeReader.Close] interrupts the method with [os.ErrClosed]. +// Calling [UnsafeReader.Flush] makes it return all records currently in the ring +// buffer, followed by [ErrFlushed]. +// +// Returns [os.ErrDeadlineExceeded] if a deadline was set and after all records +// have been read from the ring. +// +// The returned sample aliases the ring buffer and remains valid until Commit is +// called. +func (r *UnsafeReader) Read() (Record, error) { + var rec Record + err := r.ReadInto(&rec) + return rec, err +} + +// ReadInto is like Read except that it allows reusing Record. +// +// ReadInto does not copy sample bytes. rec.RawSample aliases ring buffer memory +// and remains valid until Commit is called. +func (r *UnsafeReader) ReadInto(rec *Record) error { + if r.pendingRead { + return errors.New("ringbuffer: previous record must be committed") + } + + return readWithPoll(r.poller, r.ring, r.deadline, &r.haveData, &r.pendingErr, func() error { + return r.ring.readRecordFunc(func(sample []byte, remaining int, cons uintptr) error { + rec.RawSample = sample + rec.Remaining = remaining + r.pendingRead = true + r.pendingCons = cons + return nil + }) + }) +} + +// ReadFunc reads and processes one record via callback. +// +// The callback receives a sample view into ring buffer memory, which is valid +// only for the duration of the callback. The consumed record is committed even +// if the callback returns an error. +// +// The returned value is the minimum bytes remaining in the ring buffer after +// this record has been consumed. +func (r *UnsafeReader) ReadFunc(f func(sample []byte, remaining int) error) (int, error) { + if r.pendingRead { + return 0, errors.New("ringbuffer: previous record must be committed") + } + + var ( + rec Record + err error + ) + + err = readWithPoll(r.poller, r.ring, r.deadline, &r.haveData, &r.pendingErr, func() error { + return r.ring.readRecordFunc(func(sample []byte, remaining int, cons uintptr) error { + defer r.ring.commitRecord(cons) + + callErr := f(sample, remaining) + rec.Remaining = remaining + return callErr + }) + }) + return rec.Remaining, err +} + +// Commit advances the reader past the most recently read record. +func (r *UnsafeReader) Commit() error { + if !r.pendingRead { + return errors.New("ringbuffer: no pending record to commit") + } + + r.ring.commitRecord(r.pendingCons) + r.pendingCons = 0 + r.pendingRead = false + return nil +} + +// Close frees resources used by the reader. +// +// It interrupts calls to Read, ReadInto and ReadFunc. +func (r *UnsafeReader) Close() error { + if err := r.poller.Close(); err != nil { + if errors.Is(err, os.ErrClosed) { + return nil + } + return err + } + + var err error + if r.ring != nil { + err = r.ring.Close() + r.ring = nil + } + + return err +} + +// BufferSize returns the size in bytes of the ring buffer. +func (r *UnsafeReader) BufferSize() int { + return r.bufferSize +} + +// Flush unblocks Read/ReadInto/ReadFunc and successive Read/ReadInto/ReadFunc calls return pending +// samples at this point, until ErrFlushed is returned. +func (r *UnsafeReader) Flush() error { + return r.poller.Flush() +} + +// AvailableBytes returns the amount of data available to read in the ring +// buffer in bytes. +func (r *UnsafeReader) AvailableBytes() int { + return int(r.ring.AvailableBytes()) +} diff --git a/ringbuf/unsafe_reader_test.go b/ringbuf/unsafe_reader_test.go new file mode 100644 index 000000000..670861d3f --- /dev/null +++ b/ringbuf/unsafe_reader_test.go @@ -0,0 +1,121 @@ +package ringbuf + +import ( + "encoding/binary" + "errors" + "testing" + "time" + + "github.com/go-quicktest/qt" + + "github.com/cilium/ebpf/internal" + "github.com/cilium/ebpf/internal/sys" +) + +type testPoller struct{} + +func (testPoller) Wait(deadline time.Time) error { return nil } +func (testPoller) Flush() error { return nil } +func (testPoller) Close() error { return nil } + +type testEventRing struct { + rr *ringReader +} + +func (r *testEventRing) size() int { + return r.rr.size() +} + +func (r *testEventRing) AvailableBytes() uint64 { + return r.rr.AvailableBytes() +} + +func (r *testEventRing) readRecordFunc(f func(sample []byte, remaining int, cons uintptr) error) error { + return r.rr.readRecordFunc(f) +} + +func (r *testEventRing) commitRecord(cons uintptr) { + r.rr.commitRecord(cons) +} + +func (r *testEventRing) Close() error { + return nil +} + +func newUnsafeReaderForRecords(t *testing.T, samples ...[]byte) (*UnsafeReader, *uintptr, *uintptr) { + t.Helper() + + data := make([]byte, 512) + var ( + prod uintptr + cons uintptr + ) + + offset := uintptr(0) + for _, sample := range samples { + binary.LittleEndian.PutUint32(data[offset:], uint32(len(sample))) + offset += sys.BPF_RINGBUF_HDR_SZ + copy(data[offset:offset+uintptr(len(sample))], sample) + offset += uintptr(internal.Align(len(sample), 8)) + } + prod = offset + + rr := newRingReader(&cons, &prod, data) + er := &testEventRing{rr: rr} + reader := &UnsafeReader{ + poller: testPoller{}, + ring: er, + haveData: true, + bufferSize: rr.size(), + } + + return reader, &cons, &prod +} + +func TestUnsafeReaderReadIntoRequiresCommit(t *testing.T) { + reader, cons, _ := newUnsafeReaderForRecords(t, []byte{1, 2, 3}, []byte{4, 5}) + + var rec Record + err := reader.ReadInto(&rec) + qt.Assert(t, qt.IsNil(err)) + qt.Assert(t, qt.DeepEquals(rec.RawSample, []byte{1, 2, 3})) + qt.Assert(t, qt.Equals(*cons, uintptr(0))) + + err = reader.ReadInto(&rec) + qt.Assert(t, qt.ErrorMatches(err, "ringbuffer: previous record must be committed")) + + err = reader.Commit() + qt.Assert(t, qt.IsNil(err)) + qt.Assert(t, qt.Not(qt.Equals(*cons, uintptr(0)))) + firstCons := *cons + + err = reader.ReadInto(&rec) + qt.Assert(t, qt.IsNil(err)) + qt.Assert(t, qt.DeepEquals(rec.RawSample, []byte{4, 5})) + qt.Assert(t, qt.Equals(*cons, firstCons)) + + err = reader.Commit() + qt.Assert(t, qt.IsNil(err)) + qt.Assert(t, qt.Not(qt.Equals(*cons, firstCons))) +} + +func TestUnsafeReaderReadFuncAlwaysCommits(t *testing.T) { + reader, cons, prod := newUnsafeReaderForRecords(t, []byte{1, 2, 3}) + + wantErr := errors.New("callback failed") + remaining, err := reader.ReadFunc(func(sample []byte, remaining int) error { + qt.Assert(t, qt.DeepEquals(sample, []byte{1, 2, 3})) + qt.Assert(t, qt.Equals(remaining, 0)) + return wantErr + }) + + qt.Assert(t, qt.Equals(remaining, 0)) + qt.Assert(t, qt.Equals(err, wantErr)) + qt.Assert(t, qt.Equals(*cons, *prod)) +} + +func TestUnsafeReaderCommitWithoutPendingRecord(t *testing.T) { + reader, _, _ := newUnsafeReaderForRecords(t) + err := reader.Commit() + qt.Assert(t, qt.ErrorMatches(err, "ringbuffer: no pending record to commit")) +}