-
Notifications
You must be signed in to change notification settings - Fork 851
Expand file tree
/
Copy pathring.go
More file actions
154 lines (129 loc) · 3.88 KB
/
ring.go
File metadata and controls
154 lines (129 loc) · 3.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package ringbuf
import (
"errors"
"fmt"
"io"
"sync/atomic"
"unsafe"
"github.com/cilium/ebpf/internal"
"github.com/cilium/ebpf/internal/sys"
)
var ErrNotCommitted = errors.New("zero-copy records not yet committed")
type ringReader struct {
// These point into mmap'ed memory and must be accessed atomically.
prod_pos, cons_pos *uintptr
mask uintptr
ring []byte
// Logical consumer position for deferred zero-copy reads.
// Only valid when hasPending is true.
pendingCons uintptr
hasPending bool
}
func newRingReader(cons_ptr, prod_ptr *uintptr, ring []byte) *ringReader {
return &ringReader{
prod_pos: prod_ptr,
cons_pos: cons_ptr,
// cap is always a power of two
mask: uintptr(cap(ring)/2 - 1),
ring: ring,
}
}
// To be able to wrap around data, data pages in ring buffers are mapped twice in
// a single contiguous virtual region.
// Therefore the returned usable size is half the size of the mmaped region.
func (rr *ringReader) size() int {
return cap(rr.ring) / 2
}
// The amount of data available to read in the ring buffer.
func (rr *ringReader) AvailableBytes() uint64 {
prod := atomic.LoadUintptr(rr.prod_pos)
cons := atomic.LoadUintptr(rr.cons_pos)
return uint64(prod - cons)
}
// Like readRecordZeroCopy, but copies data into rec.RawSample and advances
// the consumer position immediately.
func (rr *ringReader) readRecord(rec *Record) error {
if rr.hasPending {
return ErrNotCommitted
}
buf := rec.RawSample
if rec.isReadOnly {
buf = nil
}
defer func() {
rec.isReadOnly = false
rr.advance()
}()
err := rr.readRecordUnsafe(rec)
if err != nil {
return err
}
n := len(rec.RawSample)
if cap(buf) < n {
buf = make([]byte, n)
} else {
buf = buf[:n]
}
copy(buf, rec.RawSample)
rec.RawSample = buf
return nil
}
// Sets rec.RawSample to a slice of the mmap'd ring buffer memory.
// Does not advance the consumer position; call advance separately.
func (rr *ringReader) readRecordUnsafe(rec *Record) error {
prod := atomic.LoadUintptr(rr.prod_pos)
cons := rr.pendingCons
if !rr.hasPending {
cons = atomic.LoadUintptr(rr.cons_pos)
}
for {
if remaining := prod - cons; remaining == 0 {
return errEOR
} else if remaining < sys.BPF_RINGBUF_HDR_SZ {
return fmt.Errorf("read record header: %w", io.ErrUnexpectedEOF)
}
// read the len field of the header atomically to ensure a happens before
// relationship with the xchg in the kernel. Without this we may see len
// without BPF_RINGBUF_BUSY_BIT before the written data is visible.
// See https://github.com/torvalds/linux/blob/v6.8/kernel/bpf/ringbuf.c#L484
start := cons & rr.mask
len := atomic.LoadUint32((*uint32)((unsafe.Pointer)(&rr.ring[start])))
header := ringbufHeader{Len: len}
if header.isBusy() {
// the next sample in the ring is not committed yet so we
// exit without storing the reader/consumer position
// and start again from the same position.
return errBusy
}
cons += sys.BPF_RINGBUF_HDR_SZ
// Data is always padded to 8 byte alignment.
dataLenAligned := uintptr(internal.Align(header.dataLen(), 8))
if remaining := prod - cons; remaining < dataLenAligned {
return fmt.Errorf("read sample data: %w", io.ErrUnexpectedEOF)
}
start = cons & rr.mask
cons += dataLenAligned
if header.isDiscard() {
// when the record header indicates that the data should be
// discarded, we skip it by just updating the pending position
// to the next record.
rr.pendingCons = cons
rr.hasPending = true
continue
}
n := header.dataLen()
rec.RawSample = rr.ring[start : start+uintptr(n)]
rec.Remaining = int(prod - cons)
rec.isReadOnly = true
rr.pendingCons = cons
rr.hasPending = true
return nil
}
}
// Commits the pending consumer position from readRecordZeroCopy calls.
func (rr *ringReader) advance() {
if rr.hasPending {
atomic.StoreUintptr(rr.cons_pos, rr.pendingCons)
rr.hasPending = false
}
}