Skip to content

Commit 1c6b21c

Browse files
committed
ringbuf: add zero-copy read with deferred consumer advancement
Signed-off-by: Ori Shussman <orishuss@gmail.com>
1 parent 729742b commit 1c6b21c

File tree

5 files changed

+281
-13
lines changed

5 files changed

+281
-13
lines changed

ringbuf/reader.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ type eventRing interface {
3131
size() int
3232
AvailableBytes() uint64
3333
readRecord(rec *Record) error
34+
readRecordUnsafe(rec *Record) error
35+
advance()
3436
Close() error
3537
}
3638

@@ -59,6 +61,10 @@ type Record struct {
5961

6062
// The minimum number of bytes remaining in the ring buffer after this Record has been read.
6163
Remaining int
64+
65+
// Set by readRecordZeroCopy to prevent readRecord from reusing
66+
// RawSample as a write destination (it points into read-only mmap memory).
67+
isReadOnly bool
6268
}
6369

6470
// Reader allows reading bpf_ringbuf_output
@@ -159,6 +165,40 @@ func (r *Reader) Read() (Record, error) {
159165

160166
// ReadInto is like Read except that it allows reusing Record and associated buffers.
161167
func (r *Reader) ReadInto(rec *Record) error {
168+
return r.readWait(func() error {
169+
return r.ring.readRecord(rec)
170+
})
171+
}
172+
173+
// Like [Reader.ReadInto], but returns a zero-copy slice into the ring buffer's
174+
// memory-mapped region instead of copying. The slice is valid until [Reader.Commit].
175+
//
176+
// RawSample points into read-only memory and must not be written to. If the
177+
// Record is later passed to [Reader.ReadInto], the existing buffer will not be
178+
// reused for the same reason; a new allocation will occur instead.
179+
//
180+
// Does not advance the consumer position. Call [Reader.Commit] after processing
181+
// to release space. Must not be mixed with [Reader.Read] or [Reader.ReadInto]
182+
// between Commit calls, or an ErrNotCommitted error will be returned.
183+
func (r *Reader) ReadUnsafe(rec *Record) error {
184+
return r.readWait(func() error {
185+
return r.ring.readRecordUnsafe(rec)
186+
})
187+
}
188+
189+
// Advances the consumer position, releasing ring buffer space from preceding
190+
// [Reader.ReadZeroCopy] calls. Slices from ReadZeroCopy are invalid after this.
191+
// No-op if there are no pending reads.
192+
func (r *Reader) Commit() {
193+
r.mu.Lock()
194+
defer r.mu.Unlock()
195+
196+
if r.ring != nil {
197+
r.ring.advance()
198+
}
199+
}
200+
201+
func (r *Reader) readWait(read func() error) error {
162202
r.mu.Lock()
163203
defer r.mu.Unlock()
164204

@@ -186,7 +226,7 @@ func (r *Reader) ReadInto(rec *Record) error {
186226
}
187227

188228
for {
189-
err := r.ring.readRecord(rec)
229+
err := read()
190230
// Not using errors.Is which is quite a bit slower
191231
// For a tight loop it might make a difference
192232
if err == errBusy {

ringbuf/reader_test.go

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,165 @@ func TestReadAfterClose(t *testing.T) {
322322
}
323323
}
324324

325+
func TestReadZeroCopySingle(t *testing.T) {
326+
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")
327+
328+
prog, events := mustOutputSamplesProg(t, sampleMessage{size: 5})
329+
330+
rd, err := NewReader(events)
331+
qt.Assert(t, qt.IsNil(err))
332+
defer rd.Close()
333+
334+
mustRun(t, prog)
335+
336+
var rec Record
337+
err = rd.ReadUnsafe(&rec)
338+
qt.Assert(t, qt.IsNil(err))
339+
qt.Assert(t, qt.Equals(len(rec.RawSample), 5))
340+
qt.Assert(t, qt.DeepEquals(rec.RawSample, []byte{1, 2, 3, 4, 4}))
341+
342+
rd.Commit()
343+
qt.Assert(t, qt.Equals(rd.AvailableBytes(), 0))
344+
}
345+
346+
func TestReadZeroCopyMulti(t *testing.T) {
347+
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")
348+
349+
prog, events := mustOutputSamplesProg(t,
350+
sampleMessage{size: 5},
351+
sampleMessage{size: 10, discard: true},
352+
sampleMessage{size: 15},
353+
)
354+
355+
rd, err := NewReader(events)
356+
qt.Assert(t, qt.IsNil(err))
357+
defer rd.Close()
358+
359+
mustRun(t, prog)
360+
361+
var rec Record
362+
363+
err = rd.ReadUnsafe(&rec)
364+
qt.Assert(t, qt.IsNil(err))
365+
qt.Assert(t, qt.Equals(len(rec.RawSample), 5))
366+
367+
err = rd.ReadUnsafe(&rec)
368+
qt.Assert(t, qt.IsNil(err))
369+
qt.Assert(t, qt.Equals(len(rec.RawSample), 15))
370+
371+
rd.Commit()
372+
qt.Assert(t, qt.Equals(rd.AvailableBytes(), 0))
373+
}
374+
375+
func TestReadZeroCopyCommitReleasesSpace(t *testing.T) {
376+
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")
377+
378+
prog, events := mustOutputSamplesProg(t, sampleMessage{size: 5})
379+
380+
rd, err := NewReader(events)
381+
qt.Assert(t, qt.IsNil(err))
382+
defer rd.Close()
383+
384+
mustRun(t, prog)
385+
386+
var rec Record
387+
err = rd.ReadUnsafe(&rec)
388+
qt.Assert(t, qt.IsNil(err))
389+
390+
qt.Assert(t, qt.Not(qt.Equals(rd.AvailableBytes(), 0)))
391+
392+
rd.Commit()
393+
qt.Assert(t, qt.Equals(rd.AvailableBytes(), 0))
394+
395+
mustRun(t, prog)
396+
err = rd.ReadUnsafe(&rec)
397+
qt.Assert(t, qt.IsNil(err))
398+
qt.Assert(t, qt.Equals(len(rec.RawSample), 5))
399+
rd.Commit()
400+
}
401+
402+
func TestReadZeroCopyDeadline(t *testing.T) {
403+
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")
404+
405+
_, events := mustOutputSamplesProg(t, sampleMessage{size: 5})
406+
407+
rd, err := NewReader(events)
408+
qt.Assert(t, qt.IsNil(err))
409+
defer rd.Close()
410+
411+
rd.SetDeadline(time.Now().Add(-time.Second))
412+
413+
var rec Record
414+
err = rd.ReadUnsafe(&rec)
415+
qt.Assert(t, qt.ErrorIs(err, os.ErrDeadlineExceeded))
416+
}
417+
418+
func TestReadIntoRejectsUncommittedZeroCopy(t *testing.T) {
419+
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")
420+
421+
prog, events := mustOutputSamplesProg(t, sampleMessage{size: 5})
422+
423+
rd, err := NewReader(events)
424+
qt.Assert(t, qt.IsNil(err))
425+
defer rd.Close()
426+
427+
mustRun(t, prog)
428+
429+
var rec Record
430+
err = rd.ReadUnsafe(&rec)
431+
qt.Assert(t, qt.IsNil(err))
432+
433+
err = rd.ReadInto(&rec)
434+
qt.Assert(t, qt.ErrorIs(err, ErrNotCommitted))
435+
436+
_, err = rd.Read()
437+
qt.Assert(t, qt.ErrorIs(err, ErrNotCommitted))
438+
439+
rd.Commit()
440+
441+
mustRun(t, prog)
442+
err = rd.ReadInto(&rec)
443+
qt.Assert(t, qt.IsNil(err))
444+
}
445+
446+
func TestCommitNoOp(t *testing.T) {
447+
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")
448+
449+
_, events := mustOutputSamplesProg(t, sampleMessage{size: 5})
450+
451+
rd, err := NewReader(events)
452+
qt.Assert(t, qt.IsNil(err))
453+
defer rd.Close()
454+
455+
rd.Commit()
456+
}
457+
458+
func BenchmarkReadZeroCopy(b *testing.B) {
459+
testutils.SkipOnOldKernel(b, "5.8", "BPF ring buffer")
460+
461+
prog, events := mustOutputSamplesProg(b, sampleMessage{size: 80, flags: 0})
462+
463+
rd, err := NewReader(events)
464+
if err != nil {
465+
b.Fatal(err)
466+
}
467+
defer rd.Close()
468+
469+
b.ReportAllocs()
470+
471+
var rec Record
472+
for b.Loop() {
473+
b.StopTimer()
474+
mustRun(b, prog)
475+
b.StartTimer()
476+
477+
if err := rd.ReadUnsafe(&rec); err != nil {
478+
b.Fatal("Can't read samples:", err)
479+
}
480+
rd.Commit()
481+
}
482+
}
483+
325484
func BenchmarkReader(b *testing.B) {
326485
testutils.SkipOnOldKernel(b, "5.8", "BPF ring buffer")
327486

ringbuf/ring.go

Lines changed: 65 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package ringbuf
22

33
import (
4+
"errors"
45
"fmt"
56
"io"
67
"sync/atomic"
@@ -10,11 +11,18 @@ import (
1011
"github.com/cilium/ebpf/internal/sys"
1112
)
1213

14+
var ErrNotCommitted = errors.New("zero-copy records not yet committed")
15+
1316
type ringReader struct {
1417
// These point into mmap'ed memory and must be accessed atomically.
1518
prod_pos, cons_pos *uintptr
1619
mask uintptr
1720
ring []byte
21+
22+
// Logical consumer position for deferred zero-copy reads.
23+
// Only valid when hasPending is true.
24+
pendingCons uintptr
25+
hasPending bool
1826
}
1927

2028
func newRingReader(cons_ptr, prod_ptr *uintptr, ring []byte) *ringReader {
@@ -41,10 +49,49 @@ func (rr *ringReader) AvailableBytes() uint64 {
4149
return uint64(prod - cons)
4250
}
4351

44-
// Read a record from an event ring.
52+
// Like readRecordZeroCopy, but copies data into rec.RawSample and advances
53+
// the consumer position immediately.
4554
func (rr *ringReader) readRecord(rec *Record) error {
55+
if rr.hasPending {
56+
return ErrNotCommitted
57+
}
58+
59+
buf := rec.RawSample
60+
if rec.isReadOnly {
61+
buf = nil
62+
}
63+
64+
defer func() {
65+
rec.isReadOnly = false
66+
rr.advance()
67+
}()
68+
69+
err := rr.readRecordUnsafe(rec)
70+
if err != nil {
71+
return err
72+
}
73+
74+
n := len(rec.RawSample)
75+
if cap(buf) < n {
76+
buf = make([]byte, n)
77+
} else {
78+
buf = buf[:n]
79+
}
80+
copy(buf, rec.RawSample)
81+
rec.RawSample = buf
82+
83+
return nil
84+
}
85+
86+
// Sets rec.RawSample to a slice of the mmap'd ring buffer memory.
87+
// Does not advance the consumer position; call advance separately.
88+
func (rr *ringReader) readRecordUnsafe(rec *Record) error {
4689
prod := atomic.LoadUintptr(rr.prod_pos)
47-
cons := atomic.LoadUintptr(rr.cons_pos)
90+
91+
cons := rr.pendingCons
92+
if !rr.hasPending {
93+
cons = atomic.LoadUintptr(rr.cons_pos)
94+
}
4895

4996
for {
5097
if remaining := prod - cons; remaining == 0 {
@@ -81,21 +128,27 @@ func (rr *ringReader) readRecord(rec *Record) error {
81128

82129
if header.isDiscard() {
83130
// when the record header indicates that the data should be
84-
// discarded, we skip it by just updating the consumer position
131+
// discarded, we skip it by just updating the pending position
85132
// to the next record.
86-
atomic.StoreUintptr(rr.cons_pos, cons)
133+
rr.pendingCons = cons
134+
rr.hasPending = true
87135
continue
88136
}
89137

90-
if n := header.dataLen(); cap(rec.RawSample) < n {
91-
rec.RawSample = make([]byte, n)
92-
} else {
93-
rec.RawSample = rec.RawSample[:n]
94-
}
95-
96-
copy(rec.RawSample, rr.ring[start:])
138+
n := header.dataLen()
139+
rec.RawSample = rr.ring[start : start+uintptr(n)]
97140
rec.Remaining = int(prod - cons)
98-
atomic.StoreUintptr(rr.cons_pos, cons)
141+
rec.isReadOnly = true
142+
rr.pendingCons = cons
143+
rr.hasPending = true
99144
return nil
100145
}
101146
}
147+
148+
// Commits the pending consumer position from readRecordZeroCopy calls.
149+
func (rr *ringReader) advance() {
150+
if rr.hasPending {
151+
atomic.StoreUintptr(rr.cons_pos, rr.pendingCons)
152+
rr.hasPending = false
153+
}
154+
}

ringbuf/ring_other.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,14 @@ func newRingBufEventRing(mapFD, size int) (*mmapEventRing, error) {
4949
return ring, nil
5050
}
5151

52+
func (ring *mmapEventRing) readRecordUnsafe(rec *Record) error {
53+
return ring.ringReader.readRecordUnsafe(rec)
54+
}
55+
56+
func (ring *mmapEventRing) advance() {
57+
ring.ringReader.advance()
58+
}
59+
5260
func (ring *mmapEventRing) Close() error {
5361
ring.cleanup.Stop()
5462

ringbuf/ring_windows.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,14 @@ func newRingBufEventRing(mapFD, size int) (*windowsEventRing, error) {
6262
return ring, nil
6363
}
6464

65+
func (ring *windowsEventRing) readRecordUnsafe(rec *Record) error {
66+
return ring.ringReader.readRecordUnsafe(rec)
67+
}
68+
69+
func (ring *windowsEventRing) advance() {
70+
ring.ringReader.advance()
71+
}
72+
6573
func (ring *windowsEventRing) Close() error {
6674
ring.cleanup.Stop()
6775

0 commit comments

Comments
 (0)