Skip to content

Commit 67b846d

Browse files
committed
ringbuf: add zero-copy record iterators
- expose Reader.Records/RecordsInto yielding zero-copy RawSample views - factor readSample/readLocked helpers; keep Read/ReadInto copy path - cap RawSample cap to len and store consumer offset after each yield - add iterator coverage in tests
1 parent df5c309 commit 67b846d

File tree

5 files changed

+120
-18
lines changed

5 files changed

+120
-18
lines changed

btf/testdata/btf_testmod.btf

0 Bytes
Binary file not shown.

btf/testdata/vmlinux.btf.gz

-13.8 KB
Binary file not shown.

ringbuf/reader.go

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ import (
55
"fmt"
66
"os"
77
"sync"
8+
"sync/atomic"
89
"time"
910
"unsafe"
1011

12+
"iter"
13+
1114
"github.com/cilium/ebpf"
1215
"github.com/cilium/ebpf/internal/platform"
1316
"github.com/cilium/ebpf/internal/sys"
@@ -40,6 +43,11 @@ func (rh *ringbufHeader) dataLen() int {
4043
}
4144

4245
type Record struct {
46+
// RawSample contains the raw bytes of a ringbuf record.
47+
//
48+
// When obtained via [Reader.Records], RawSample is a zero-copy view into the
49+
// underlying mmap. It is only valid until the iterator yields the next
50+
// record or terminates. Callers must copy the data if they need to retain it.
4351
RawSample []byte
4452

4553
// The minimum number of bytes remaining in the ring buffer after this Record has been read.
@@ -144,6 +152,59 @@ func (r *Reader) Read() (Record, error) {
144152

145153
// ReadInto is like Read except that it allows reusing Record and associated buffers.
146154
func (r *Reader) ReadInto(rec *Record) error {
155+
return r.readLocked(func() error {
156+
return r.ring.readRecord(rec)
157+
})
158+
}
159+
160+
// Records iterates over records in the reader until [Reader.Close] is called.
161+
//
162+
// Record.RawSample is only valid until the next call to the iterator. Callers
163+
// must copy the data if it needs to outlive the current iteration.
164+
//
165+
// This convenience wrapper allocates a single Record once. To fully avoid
166+
// allocations, use [Reader.RecordsInto] and pass in a reusable Record.
167+
func (r *Reader) Records() iter.Seq2[*Record, error] {
168+
rec := Record{}
169+
return r.RecordsInto(&rec)
170+
}
171+
172+
// RecordsInto is like Records but allows reusing the Record and associated buffers.
173+
func (r *Reader) RecordsInto(rec *Record) iter.Seq2[*Record, error] {
174+
return func(yield func(*Record, error) bool) {
175+
var (
176+
sample []byte
177+
remaining int
178+
nextCons uintptr
179+
)
180+
181+
for {
182+
err := r.readLocked(func() error {
183+
var err error
184+
sample, remaining, nextCons, err = r.ring.readSample()
185+
return err
186+
})
187+
if err != nil {
188+
yield(nil, err)
189+
return
190+
}
191+
192+
// Limit cap to len so append can't write past the record and corrupt the ring.
193+
rec.RawSample = sample[:len(sample):len(sample)]
194+
rec.Remaining = remaining
195+
196+
if !yield(rec, nil) {
197+
atomic.StoreUintptr(r.ring.cons_pos, nextCons)
198+
return
199+
}
200+
201+
atomic.StoreUintptr(r.ring.cons_pos, nextCons)
202+
}
203+
}
204+
}
205+
206+
// readLocked drives the polling / data-availability loop shared by Record reads.
207+
func (r *Reader) readLocked(read func() error) error {
147208
r.mu.Lock()
148209
defer r.mu.Unlock()
149210

@@ -171,9 +232,7 @@ func (r *Reader) ReadInto(rec *Record) error {
171232
}
172233

173234
for {
174-
err := r.ring.readRecord(rec)
175-
// Not using errors.Is which is quite a bit slower
176-
// For a tight loop it might make a difference
235+
err := read()
177236
if err == errBusy {
178237
continue
179238
}

ringbuf/reader_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,3 +384,34 @@ func BenchmarkReadInto(b *testing.B) {
384384
}
385385
}
386386
}
387+
388+
func TestRecordsIterator(t *testing.T) {
389+
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")
390+
391+
prog, events := mustOutputSamplesProg(t,
392+
sampleMessage{size: 5, flags: 0},
393+
sampleMessage{size: 7, flags: 0},
394+
)
395+
mustRun(t, prog)
396+
397+
rd, err := NewReader(events)
398+
if err != nil {
399+
t.Fatal(err)
400+
}
401+
defer rd.Close()
402+
403+
var seen [][]byte
404+
for rec, err := range rd.Records() {
405+
if err != nil {
406+
t.Fatalf("iteration error: %v", err)
407+
}
408+
seen = append(seen, append([]byte(nil), rec.RawSample...))
409+
if len(seen) == 2 {
410+
break
411+
}
412+
}
413+
414+
qt.Assert(t, qt.Equals(len(seen), 2))
415+
qt.Assert(t, qt.DeepEquals(seen[0], []byte{1, 2, 3, 4, 4}))
416+
qt.Assert(t, qt.DeepEquals(seen[1], []byte{1, 2, 3, 4, 4, 3, 2}))
417+
}

ringbuf/ring.go

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,17 @@ func (rr *ringReader) AvailableBytes() uint64 {
4141
return uint64(prod - cons)
4242
}
4343

44-
// Read a record from an event ring.
45-
func (rr *ringReader) readRecord(rec *Record) error {
44+
// readSample returns a zero-copy view into the next sample, together with the
45+
// consumer position that should be stored to release the data.
46+
func (rr *ringReader) readSample() (sample []byte, remaining int, nextCons uintptr, err error) {
4647
prod := atomic.LoadUintptr(rr.prod_pos)
4748
cons := atomic.LoadUintptr(rr.cons_pos)
4849

4950
for {
5051
if remaining := prod - cons; remaining == 0 {
51-
return errEOR
52+
return nil, 0, 0, errEOR
5253
} else if remaining < sys.BPF_RINGBUF_HDR_SZ {
53-
return fmt.Errorf("read record header: %w", io.ErrUnexpectedEOF)
54+
return nil, 0, 0, fmt.Errorf("read record header: %w", io.ErrUnexpectedEOF)
5455
}
5556

5657
// read the len field of the header atomically to ensure a happens before
@@ -65,15 +66,15 @@ func (rr *ringReader) readRecord(rec *Record) error {
6566
// the next sample in the ring is not committed yet so we
6667
// exit without storing the reader/consumer position
6768
// and start again from the same position.
68-
return errBusy
69+
return nil, 0, 0, errBusy
6970
}
7071

7172
cons += sys.BPF_RINGBUF_HDR_SZ
7273

7374
// Data is always padded to 8 byte alignment.
7475
dataLenAligned := uintptr(internal.Align(header.dataLen(), 8))
7576
if remaining := prod - cons; remaining < dataLenAligned {
76-
return fmt.Errorf("read sample data: %w", io.ErrUnexpectedEOF)
77+
return nil, 0, 0, fmt.Errorf("read sample data: %w", io.ErrUnexpectedEOF)
7778
}
7879

7980
start = cons & rr.mask
@@ -87,15 +88,26 @@ func (rr *ringReader) readRecord(rec *Record) error {
8788
continue
8889
}
8990

90-
if n := header.dataLen(); cap(rec.RawSample) < n {
91-
rec.RawSample = make([]byte, n)
92-
} else {
93-
rec.RawSample = rec.RawSample[:n]
94-
}
91+
end := int(start) + header.dataLen()
92+
return rr.ring[start:end], int(prod - cons), cons, nil
93+
}
94+
}
9595

96-
copy(rec.RawSample, rr.ring[start:])
97-
rec.Remaining = int(prod - cons)
98-
atomic.StoreUintptr(rr.cons_pos, cons)
99-
return nil
96+
// Read a record from an event ring, copying the sample into the provided Record.
97+
func (rr *ringReader) readRecord(rec *Record) error {
98+
sample, remaining, nextCons, err := rr.readSample()
99+
if err != nil {
100+
return err
101+
}
102+
103+
if n := len(sample); cap(rec.RawSample) < n {
104+
rec.RawSample = make([]byte, n)
105+
} else {
106+
rec.RawSample = rec.RawSample[:n]
100107
}
108+
109+
copy(rec.RawSample, sample)
110+
rec.Remaining = remaining
111+
atomic.StoreUintptr(rr.cons_pos, nextCons)
112+
return nil
101113
}

0 commit comments

Comments
 (0)