forked from cilium/ebpf
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreader.go
More file actions
264 lines (224 loc) · 6.63 KB
/
reader.go
File metadata and controls
264 lines (224 loc) · 6.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
package ringbuf
import (
"errors"
"fmt"
"os"
"sync"
"sync/atomic"
"time"
"unsafe"
"iter"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/internal/platform"
"github.com/cilium/ebpf/internal/sys"
)
var (
ErrClosed = os.ErrClosed
errEOR = errors.New("end of ring")
errBusy = errors.New("sample not committed yet")
)
// ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c
type ringbufHeader struct {
Len uint32
_ uint32 // pg_off, only used by kernel internals
}
const ringbufHeaderSize = int(unsafe.Sizeof(ringbufHeader{}))
func (rh *ringbufHeader) isBusy() bool {
return rh.Len&sys.BPF_RINGBUF_BUSY_BIT != 0
}
func (rh *ringbufHeader) isDiscard() bool {
return rh.Len&sys.BPF_RINGBUF_DISCARD_BIT != 0
}
func (rh *ringbufHeader) dataLen() int {
return int(rh.Len & ^uint32(sys.BPF_RINGBUF_BUSY_BIT|sys.BPF_RINGBUF_DISCARD_BIT))
}
type Record struct {
// RawSample contains the raw bytes of a ringbuf record.
//
// When obtained via [Reader.Records], RawSample is a zero-copy view into the
// underlying mmap. It is only valid until the iterator yields the next
// record or terminates. Callers must copy the data if they need to retain it.
RawSample []byte
// The minimum number of bytes remaining in the ring buffer after this Record has been read.
Remaining int
}
// Reader allows reading bpf_ringbuf_output
// from user space.
type Reader struct {
poller *poller
// mu protects read/write access to the Reader structure
mu sync.Mutex
ring *ringbufEventRing
haveData bool
deadline time.Time
bufferSize int
pendingErr error
}
// NewReader creates a new BPF ringbuf reader.
func NewReader(ringbufMap *ebpf.Map) (*Reader, error) {
if ringbufMap.Type() != ebpf.RingBuf && ringbufMap.Type() != ebpf.WindowsRingBuf {
return nil, fmt.Errorf("invalid Map type: %s", ringbufMap.Type())
}
maxEntries := int(ringbufMap.MaxEntries())
if maxEntries == 0 || (maxEntries&(maxEntries-1)) != 0 {
return nil, fmt.Errorf("ringbuffer map size %d is zero or not a power of two", maxEntries)
}
poller, err := newPoller(ringbufMap.FD())
if err != nil {
return nil, err
}
ring, err := newRingBufEventRing(ringbufMap.FD(), maxEntries)
if err != nil {
poller.Close()
return nil, fmt.Errorf("failed to create ringbuf ring: %w", err)
}
return &Reader{
poller: poller,
ring: ring,
bufferSize: ring.size(),
// On Windows, the wait handle is only set when the reader is created,
// so we miss any wakeups that happened before.
// Do an opportunistic read to get any pending samples.
haveData: platform.IsWindows,
}, nil
}
// Close frees resources used by the reader.
//
// It interrupts calls to Read.
func (r *Reader) Close() error {
if err := r.poller.Close(); err != nil {
if errors.Is(err, os.ErrClosed) {
return nil
}
return err
}
// Acquire the lock. This ensures that Read isn't running.
r.mu.Lock()
defer r.mu.Unlock()
var err error
if r.ring != nil {
err = r.ring.Close()
r.ring = nil
}
return err
}
// SetDeadline controls how long Read and ReadInto will block waiting for samples.
//
// Passing a zero time.Time will remove the deadline.
func (r *Reader) SetDeadline(t time.Time) {
r.mu.Lock()
defer r.mu.Unlock()
r.deadline = t
}
// Read the next record from the BPF ringbuf.
//
// Calling [Close] interrupts the method with [os.ErrClosed]. Calling [Flush]
// makes it return all records currently in the ring buffer, followed by [ErrFlushed].
//
// Returns [os.ErrDeadlineExceeded] if a deadline was set and after all records
// have been read from the ring.
//
// See [ReadInto] for a more efficient version of this method.
func (r *Reader) Read() (Record, error) {
var rec Record
err := r.ReadInto(&rec)
return rec, err
}
// ReadInto is like Read except that it allows reusing Record and associated buffers.
func (r *Reader) ReadInto(rec *Record) error {
return r.readLocked(func() error {
return r.ring.readRecord(rec)
})
}
// Records iterates over records in the reader until [Reader.Close] is called.
//
// Record.RawSample is only valid until the next call to the iterator. Callers
// must copy the data if it needs to outlive the current iteration.
//
// This convenience wrapper allocates a single Record once. To fully avoid
// allocations, use [Reader.RecordsInto] and pass in a reusable Record.
func (r *Reader) Records() iter.Seq2[*Record, error] {
rec := Record{}
return r.RecordsInto(&rec)
}
// RecordsInto is like Records but allows reusing the Record and associated buffers.
func (r *Reader) RecordsInto(rec *Record) iter.Seq2[*Record, error] {
return func(yield func(*Record, error) bool) {
var (
sample []byte
remaining int
nextCons uintptr
)
for {
err := r.readLocked(func() error {
var err error
sample, remaining, nextCons, err = r.ring.readSample()
return err
})
if err != nil {
yield(nil, err)
return
}
// Limit cap to len so append can't write past the record and corrupt the ring.
rec.RawSample = sample[:len(sample):len(sample)]
rec.Remaining = remaining
if !yield(rec, nil) {
atomic.StoreUintptr(r.ring.cons_pos, nextCons)
return
}
atomic.StoreUintptr(r.ring.cons_pos, nextCons)
}
}
}
// readLocked drives the polling / data-availability loop shared by Record reads.
func (r *Reader) readLocked(read func() error) error {
r.mu.Lock()
defer r.mu.Unlock()
if r.ring == nil {
return fmt.Errorf("ringbuffer: %w", ErrClosed)
}
for {
if !r.haveData {
if pe := r.pendingErr; pe != nil {
r.pendingErr = nil
return pe
}
err := r.poller.Wait(r.deadline)
if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) {
// Ignoring this for reading a valid entry after timeout or flush.
// This can occur if the producer submitted to the ring buffer
// with BPF_RB_NO_WAKEUP.
r.pendingErr = err
} else if err != nil {
return err
}
r.haveData = true
}
for {
err := read()
if err == errBusy {
continue
}
if err == errEOR {
r.haveData = false
break
}
return err
}
}
}
// BufferSize returns the size in bytes of the ring buffer
func (r *Reader) BufferSize() int {
return r.bufferSize
}
// Flush unblocks Read/ReadInto and successive Read/ReadInto calls will return pending samples at this point,
// until you receive a ErrFlushed error.
func (r *Reader) Flush() error {
return r.poller.Flush()
}
// AvailableBytes returns the amount of data available to read in the ring buffer in bytes.
func (r *Reader) AvailableBytes() int {
// Don't need to acquire the lock here since the implementation of AvailableBytes
// performs atomic loads on the producer and consumer positions.
return int(r.ring.AvailableBytes())
}