forked from cilium/ebpf
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreader.go
More file actions
260 lines (221 loc) · 6.65 KB
/
reader.go
File metadata and controls
260 lines (221 loc) · 6.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
package ringbuf
import (
"errors"
"fmt"
"os"
"sync"
"time"
"unsafe"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/internal/platform"
"github.com/cilium/ebpf/internal/sys"
)
var (
ErrClosed = os.ErrClosed
errEOR = errors.New("end of ring")
errBusy = errors.New("sample not committed yet")
)
// poller abstracts platform-specific event notification.
type poller interface {
Wait(deadline time.Time) error
Flush() error
Close() error
}
// eventRing abstracts platform-specific ring buffer memory access.
type eventRing interface {
size() int
AvailableBytes() uint64
readRecord(rec *Record) error
readRecordUnsafe(rec *Record) error
advance()
Close() error
}
// ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c
type ringbufHeader struct {
Len uint32
_ uint32 // pg_off, only used by kernel internals
}
const ringbufHeaderSize = int(unsafe.Sizeof(ringbufHeader{}))
func (rh *ringbufHeader) isBusy() bool {
return rh.Len&sys.BPF_RINGBUF_BUSY_BIT != 0
}
func (rh *ringbufHeader) isDiscard() bool {
return rh.Len&sys.BPF_RINGBUF_DISCARD_BIT != 0
}
func (rh *ringbufHeader) dataLen() int {
return int(rh.Len & ^uint32(sys.BPF_RINGBUF_BUSY_BIT|sys.BPF_RINGBUF_DISCARD_BIT))
}
type Record struct {
RawSample []byte
// The minimum number of bytes remaining in the ring buffer after this Record has been read.
Remaining int
// Set by readRecordZeroCopy to prevent readRecord from reusing
// RawSample as a write destination (it points into read-only mmap memory).
isReadOnly bool
}
// Reader allows reading bpf_ringbuf_output
// from user space.
type Reader struct {
poller poller
// mu protects read/write access to the Reader structure
mu sync.Mutex
ring eventRing
haveData bool
deadline time.Time
bufferSize int
pendingErr error
}
// NewReader creates a new BPF ringbuf reader.
func NewReader(ringbufMap *ebpf.Map) (*Reader, error) {
if ringbufMap.Type() != ebpf.RingBuf && ringbufMap.Type() != ebpf.WindowsRingBuf {
return nil, fmt.Errorf("invalid Map type: %s", ringbufMap.Type())
}
maxEntries := int(ringbufMap.MaxEntries())
if maxEntries == 0 || (maxEntries&(maxEntries-1)) != 0 {
return nil, fmt.Errorf("ringbuffer map size %d is zero or not a power of two", maxEntries)
}
poller, err := newPoller(ringbufMap.FD())
if err != nil {
return nil, err
}
ring, err := newRingBufEventRing(ringbufMap.FD(), maxEntries)
if err != nil {
poller.Close()
return nil, fmt.Errorf("failed to create ringbuf ring: %w", err)
}
return &Reader{
poller: poller,
ring: ring,
bufferSize: ring.size(),
// On Windows, the wait handle is only set when the reader is created,
// so we miss any wakeups that happened before.
// Do an opportunistic read to get any pending samples.
haveData: platform.IsWindows,
}, nil
}
// Close frees resources used by the reader.
//
// It interrupts calls to Read.
func (r *Reader) Close() error {
if err := r.poller.Close(); err != nil {
if errors.Is(err, os.ErrClosed) {
return nil
}
return err
}
// Acquire the lock. This ensures that Read isn't running.
r.mu.Lock()
defer r.mu.Unlock()
var err error
if r.ring != nil {
err = r.ring.Close()
r.ring = nil
}
return err
}
// SetDeadline controls how long Read and ReadInto will block waiting for samples.
//
// Passing a zero time.Time will remove the deadline.
func (r *Reader) SetDeadline(t time.Time) {
r.mu.Lock()
defer r.mu.Unlock()
r.deadline = t
}
// Read the next record from the BPF ringbuf.
//
// Calling [Close] interrupts the method with [os.ErrClosed]. Calling [Flush]
// makes it return all records currently in the ring buffer, followed by [ErrFlushed].
//
// Returns [os.ErrDeadlineExceeded] if a deadline was set and after all records
// have been read from the ring.
//
// See [ReadInto] for a more efficient version of this method.
func (r *Reader) Read() (Record, error) {
var rec Record
err := r.ReadInto(&rec)
return rec, err
}
// ReadInto is like Read except that it allows reusing Record and associated buffers.
func (r *Reader) ReadInto(rec *Record) error {
return r.readWait(func() error {
return r.ring.readRecord(rec)
})
}
// Like [Reader.ReadInto], but returns a zero-copy slice into the ring buffer's
// memory-mapped region instead of copying. The slice is valid until [Reader.Commit].
//
// RawSample points into read-only memory and must not be written to. If the
// Record is later passed to [Reader.ReadInto], the existing buffer will not be
// reused for the same reason; a new allocation will occur instead.
//
// Does not advance the consumer position. Call [Reader.Commit] after processing
// to release space. Must not be mixed with [Reader.Read] or [Reader.ReadInto]
// between Commit calls, or an ErrNotCommitted error will be returned.
func (r *Reader) ReadUnsafe(rec *Record) error {
return r.readWait(func() error {
return r.ring.readRecordUnsafe(rec)
})
}
// Advances the consumer position, releasing ring buffer space from preceding
// [Reader.ReadUnsafe] calls. Slices from ReadUnsafe are invalid after this.
// No-op if there are no pending reads.
func (r *Reader) Commit() {
r.mu.Lock()
defer r.mu.Unlock()
if r.ring != nil {
r.ring.advance()
}
}
func (r *Reader) readWait(read func() error) error {
r.mu.Lock()
defer r.mu.Unlock()
if r.ring == nil {
return fmt.Errorf("ringbuffer: %w", ErrClosed)
}
for {
if !r.haveData {
if pe := r.pendingErr; pe != nil {
r.pendingErr = nil
return pe
}
err := r.poller.Wait(r.deadline)
if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) {
// Ignoring this for reading a valid entry after timeout or flush.
// This can occur if the producer submitted to the ring buffer
// with BPF_RB_NO_WAKEUP.
r.pendingErr = err
} else if err != nil {
return err
}
r.haveData = true
}
for {
err := read()
// Not using errors.Is which is quite a bit slower
// For a tight loop it might make a difference
if err == errBusy {
continue
}
if err == errEOR {
r.haveData = false
break
}
return err
}
}
}
// BufferSize returns the size in bytes of the ring buffer
func (r *Reader) BufferSize() int {
return r.bufferSize
}
// Flush unblocks Read/ReadInto and successive Read/ReadInto calls will return pending samples at this point,
// until you receive a ErrFlushed error.
func (r *Reader) Flush() error {
return r.poller.Flush()
}
// AvailableBytes returns the amount of data available to read in the ring buffer in bytes.
func (r *Reader) AvailableBytes() int {
// Don't need to acquire the lock here since the implementation of AvailableBytes
// performs atomic loads on the producer and consumer positions.
return int(r.ring.AvailableBytes())
}