Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified btf/testdata/btf_testmod.btf
Binary file not shown.
Binary file modified btf/testdata/vmlinux.btf.gz
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for the change on this file?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI told me to run make locally, otherwise it turns red 😬 then those two files are added...

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

poking here -
since these are generated locally - they are bound to your kernel version, ~v6.x, which will render the repo's tests bound to it as well, which is generally not good :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

poking here - since these are generated locally - they are bound to your kernel version, ~v6.x, which will render the repo's tests bound to it as well, which is generally not good :)

Not really, we tend to be quite anal about reproducible builds and tests. This file is pulled from an image in cilium/ci-kernels, which saw some changes recently. A rebase should fix this.

Binary file not shown.
65 changes: 62 additions & 3 deletions ringbuf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import (
"fmt"
"os"
"sync"
"sync/atomic"
"time"
"unsafe"

"iter"

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/internal/platform"
"github.com/cilium/ebpf/internal/sys"
Expand Down Expand Up @@ -40,6 +43,11 @@ func (rh *ringbufHeader) dataLen() int {
}

type Record struct {
// RawSample contains the raw bytes of a ringbuf record.
//
// When obtained via [Reader.Records], RawSample is a zero-copy view into the
// underlying mmap. It is only valid until the iterator yields the next
// record or terminates. Callers must copy the data if they need to retain it.
RawSample []byte

// The minimum number of bytes remaining in the ring buffer after this Record has been read.
Expand Down Expand Up @@ -144,6 +152,59 @@ func (r *Reader) Read() (Record, error) {

// ReadInto is like Read except that it allows reusing Record and associated buffers.
func (r *Reader) ReadInto(rec *Record) error {
return r.readLocked(func() error {
return r.ring.readRecord(rec)
})
}

// Records iterates over records in the reader until [Reader.Close] is called.
//
// Record.RawSample is only valid until the next call to the iterator. Callers
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even more explicit: both Record and Record.RawSample are only valid until the next call.

// must copy the data if it needs to outlive the current iteration.
//
// This convenience wrapper allocates a single Record once. To fully avoid
// allocations, use [Reader.RecordsInto] and pass in a reusable Record.
func (r *Reader) Records() iter.Seq2[*Record, error] {
rec := Record{}
return r.RecordsInto(&rec)
}

// RecordsInto is like Records but allows reusing the Record and associated buffers.
func (r *Reader) RecordsInto(rec *Record) iter.Seq2[*Record, error] {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Records() can get away with allocating Record once when creating the iterator there isn't much point in this function IMO.

return func(yield func(*Record, error) bool) {
var (
sample []byte
remaining int
nextCons uintptr
)

for {
err := r.readLocked(func() error {
var err error
sample, remaining, nextCons, err = r.ring.readSample()
return err
})
if err != nil {
yield(nil, err)
return
}

// Limit cap to len so append can't write past the record and corrupt the ring.
rec.RawSample = sample[:len(sample):len(sample)]
rec.Remaining = remaining

if !yield(rec, nil) {
atomic.StoreUintptr(r.ring.cons_pos, nextCons)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is safe to do. Consider two concurrent callers of Records():

  • First caller reads record A, offset 10
  • Second caller reads record B, offset 20
  • Second caller finishes the loop quicker than the first, offset 20 is written.
  • Data of the first caller is corrupted.

I think you need to hold the lock across the yield. Maybe its as simple as moving it all into readLocked?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am aware of this concurrent read issue, my first version was holding the lock during the iterator. However, that leads to the problem of dead lock when calling reader.SetDeadline:

for rec, err := range reader.Records() {
    reader.SetDeadline() // dead lock
}

reader.Close() also tries to acquire the reader.mu, it will become impossible to call Close() during iteration 😬

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I drafted a PR on my fork for your preview: jschwinger233#5

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not being able to call Close() from inside the iterator seems fine to me. Are you saying it's not possible to Close() while we are stuck in Wait()? Sounds like a bug to me.

SetDeadline is a problem, that should work. This might mean you need to split the lock somehow. Take a look at the perf reader, this has more complex locking due to pauseMu. IIRC it also drops the lock before waiting, and then acquires it after. Maybe you can use a similar strategy here (which could resolve Close() during Wait()).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it also drops the lock before waiting, and then acquires it after.

Let me make it clear, so we are talking something like

for rec, err := range reader.Records() {
    // read.mu is released 
    reader.SetDeadline() // no dead lock
}

This strategy doesn't fix the two concurrent callers of Records(), right?

  1. First caller reads record A, offset 10
  2. Second caller reads record B, offset 20
  3. Second caller finishes the loop quicker than the first, offset 20 is written.
  4. Data of the first caller is corrupted.

Consider 1st reader:

for rec, err := range reader.Records() {
    // read.mu is released
    time.Sleep() // for some reason, let's sleep
    handle_rec()
}

2nd reader:

for rec, err := range reader.Records() {
    // read.mu is released, so no blocking when 1st reader is sleeping
    handle_rec()
}

2nd reader will move forward the cosumer_pointer, leaving slower 1st reader's rec invalid.

return
}

atomic.StoreUintptr(r.ring.cons_pos, nextCons)
}
}
}

// readLocked drives the polling / data-availability loop shared by Record reads.
func (r *Reader) readLocked(read func() error) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having read be a closure over Reader state is a bit black magic. How about

Suggested change
func (r *Reader) readLocked(read func() error) error {
func (r *Reader) readLocked(handle func(sample []byte, remaining int) error) error {

readLocked would be responsible for calling readSample and advancing the consumer position. readRecord would just be func(*Record, sample []byte, remaining) error.

Probably also easier for the compiler to turn into good code since handle doesn't have to be a closure.

r.mu.Lock()
defer r.mu.Unlock()

Expand Down Expand Up @@ -171,9 +232,7 @@ func (r *Reader) ReadInto(rec *Record) error {
}

for {
err := r.ring.readRecord(rec)
// Not using errors.Is which is quite a bit slower
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: please keep the comment.

// For a tight loop it might make a difference
err := read()
if err == errBusy {
continue
}
Expand Down
31 changes: 31 additions & 0 deletions ringbuf/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,3 +384,34 @@ func BenchmarkReadInto(b *testing.B) {
}
}
}

func TestRecordsIterator(t *testing.T) {
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")

prog, events := mustOutputSamplesProg(t,
sampleMessage{size: 5, flags: 0},
sampleMessage{size: 7, flags: 0},
)
mustRun(t, prog)

rd, err := NewReader(events)
if err != nil {
t.Fatal(err)
}
defer rd.Close()

var seen [][]byte
for rec, err := range rd.Records() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a test that Reader.Close() stops the iterator without yielding io.ErrClosed. Using iter.Pull might be the easiest way to achieve that.

if err != nil {
t.Fatalf("iteration error: %v", err)
}
seen = append(seen, append([]byte(nil), rec.RawSample...))
if len(seen) == 2 {
break
}
}

qt.Assert(t, qt.Equals(len(seen), 2))
qt.Assert(t, qt.DeepEquals(seen[0], []byte{1, 2, 3, 4, 4}))
qt.Assert(t, qt.DeepEquals(seen[1], []byte{1, 2, 3, 4, 4, 3, 2}))
}
42 changes: 27 additions & 15 deletions ringbuf/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,17 @@ func (rr *ringReader) AvailableBytes() uint64 {
return uint64(prod - cons)
}

// Read a record from an event ring.
func (rr *ringReader) readRecord(rec *Record) error {
// readSample returns a zero-copy view into the next sample, together with the
// consumer position that should be stored to release the data.
func (rr *ringReader) readSample() (sample []byte, remaining int, nextCons uintptr, err error) {
prod := atomic.LoadUintptr(rr.prod_pos)
cons := atomic.LoadUintptr(rr.cons_pos)

for {
if remaining := prod - cons; remaining == 0 {
return errEOR
return nil, 0, 0, errEOR
} else if remaining < sys.BPF_RINGBUF_HDR_SZ {
return fmt.Errorf("read record header: %w", io.ErrUnexpectedEOF)
return nil, 0, 0, fmt.Errorf("read record header: %w", io.ErrUnexpectedEOF)
}

// read the len field of the header atomically to ensure a happens before
Expand All @@ -65,15 +66,15 @@ func (rr *ringReader) readRecord(rec *Record) error {
// the next sample in the ring is not committed yet so we
// exit without storing the reader/consumer position
// and start again from the same position.
return errBusy
return nil, 0, 0, errBusy
}

cons += sys.BPF_RINGBUF_HDR_SZ

// Data is always padded to 8 byte alignment.
dataLenAligned := uintptr(internal.Align(header.dataLen(), 8))
if remaining := prod - cons; remaining < dataLenAligned {
return fmt.Errorf("read sample data: %w", io.ErrUnexpectedEOF)
return nil, 0, 0, fmt.Errorf("read sample data: %w", io.ErrUnexpectedEOF)
}

start = cons & rr.mask
Expand All @@ -87,15 +88,26 @@ func (rr *ringReader) readRecord(rec *Record) error {
continue
}

if n := header.dataLen(); cap(rec.RawSample) < n {
rec.RawSample = make([]byte, n)
} else {
rec.RawSample = rec.RawSample[:n]
}
end := int(start) + header.dataLen()
return rr.ring[start:end], int(prod - cons), cons, nil
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reslicing to reduce cap could already happen here.

}
}

copy(rec.RawSample, rr.ring[start:])
rec.Remaining = int(prod - cons)
atomic.StoreUintptr(rr.cons_pos, cons)
return nil
// Read a record from an event ring, copying the sample into the provided Record.
func (rr *ringReader) readRecord(rec *Record) error {
sample, remaining, nextCons, err := rr.readSample()
if err != nil {
return err
}

if n := len(sample); cap(rec.RawSample) < n {
rec.RawSample = make([]byte, n)
} else {
rec.RawSample = rec.RawSample[:n]
}

copy(rec.RawSample, sample)
rec.Remaining = remaining
atomic.StoreUintptr(rr.cons_pos, nextCons)
return nil
}