Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions ringbuf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (r *Reader) SetDeadline(t time.Time) {
// Returns [os.ErrDeadlineExceeded] if a deadline was set and after all records
// have been read from the ring.
//
// See [ReadInto] for a more efficient version of this method.
// See [ReadInto] or [ReadZeroCopy] for more efficient versions of this method.
func (r *Reader) Read() (Record, error) {
var rec Record
err := r.ReadInto(&rec)
Expand All @@ -144,6 +144,20 @@ func (r *Reader) Read() (Record, error) {

// ReadInto is like Read except that it allows reusing Record and associated buffers.
func (r *Reader) ReadInto(rec *Record) error {
return r.readLockedPoll(func() error {
return r.ring.readRecord(rec)
})
}

// ReadZeroCopy reads the next record from the BPF ringbuf using a zero-copy callback.
// The sample slice is only valid until the callback returns.
func (r *Reader) ReadZeroCopy(f func(sample []byte, remaining int) error) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following the code, I think this function has the potential be used incorrectly?

var saved []byte
reader.ReadZeroCopy(func(sample []byte, remaining int) error {
    saved = sample  // retains reference to temporary memory?
    return nil
})
// is saved still valid at this point?

return r.readLockedPoll(func() error {
return r.ring.readRecordZeroCopy(f)
})
}

func (r *Reader) readLockedPoll(read func() error) error {
r.mu.Lock()
defer r.mu.Unlock()

Expand Down Expand Up @@ -171,7 +185,7 @@ func (r *Reader) ReadInto(rec *Record) error {
}

for {
err := r.ring.readRecord(rec)
err := read()
// Not using errors.Is which is quite a bit slower
// For a tight loop it might make a difference
if err == errBusy {
Expand Down
27 changes: 18 additions & 9 deletions ringbuf/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,21 @@ func (rr *ringReader) AvailableBytes() uint64 {

// Read a record from an event ring.
func (rr *ringReader) readRecord(rec *Record) error {
return rr.readRecordZeroCopy(func(sample []byte, remaining int) error {
n := len(sample)
if cap(rec.RawSample) < n {
rec.RawSample = make([]byte, n)
} else {
rec.RawSample = rec.RawSample[:n]
}
copy(rec.RawSample, sample)
rec.Remaining = remaining

return nil
})
}

func (rr *ringReader) readRecordZeroCopy(f func(sample []byte, remaining int) error) error {
prod := atomic.LoadUintptr(rr.prod_pos)
cons := atomic.LoadUintptr(rr.cons_pos)

Expand Down Expand Up @@ -87,15 +102,9 @@ func (rr *ringReader) readRecord(rec *Record) error {
continue
}

if n := header.dataLen(); cap(rec.RawSample) < n {
rec.RawSample = make([]byte, n)
} else {
rec.RawSample = rec.RawSample[:n]
}

copy(rec.RawSample, rr.ring[start:])
rec.Remaining = int(prod - cons)
n := header.dataLen()
err := f(rr.ring[start:start+uintptr(n)], int(prod-cons))
atomic.StoreUintptr(rr.cons_pos, cons)
return nil
return err
}
}