-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_perf_event.py
More file actions
189 lines (159 loc) · 5.17 KB
/
test_perf_event.py
File metadata and controls
189 lines (159 loc) · 5.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import time
import numpy as np
from numba import njit
import pytest
from py_perf_event import (
Measure,
Hardware,
Cache,
CacheId,
CacheOp,
CacheResult,
Raw,
measure,
measure_until_full_read,
PartialRead,
)
def test_measure_class():
"""
The ``Measure`` API allows getting counters for any code.
"""
m = Measure([Hardware.INSTRUCTIONS])
m.enable()
sum(range(1_000_000))
[instructions] = m.read().measurements
sum(range(1_000_000))
[instructions2] = m.read().measurements
m.disable()
assert 1.5 < instructions2 / instructions < 2.5
def test_measure_function():
"""
The measure() API allows getting counters for a given callable, which gets
called with the given arguments.
"""
[instructions1] = measure_until_full_read(
[Hardware.INSTRUCTIONS], sum, range(1_000_000)
)
[instructions2] = measure_until_full_read(
[Hardware.INSTRUCTIONS], sum, range(10_000_000)
)
assert instructions1 > 1_000_000
assert 7 < (instructions2 / instructions1) < 15
def test_cache_ops():
"""
``Cache`` instances get measured too.
"""
ll_reads = Cache(CacheId.LL, CacheOp.READ, CacheResult.ACCESS)
ll_misses = Cache(CacheId.LL, CacheOp.READ, CacheResult.MISS)
# Random scan, not linear:
def traverse(l):
result = 0
length = len(l)
for i in range(length):
result += l[(i * 123763537) % length]
return result
small_list = list(range(1_000))
[small_reads, small_misses] = measure_until_full_read(
[ll_reads, ll_misses], traverse, small_list
)
large_list = list(range(10_000_000))
[large_reads, large_misses] = measure_until_full_read(
[ll_reads, ll_misses], traverse, large_list
)
assert small_reads < 1000
assert small_misses <= small_reads
assert large_reads > 1000 * small_reads
assert (large_misses / large_reads) > 0.2
def test_raw():
"""
``Raw()`` events get measured.
TODO: This test is model-specific, only tested on i7-12700K.
"""
# SIMD on float64:
simd_f64 = [Raw(0x4C7), Raw(0x10C7)]
f64_data = np.ones((1_000_000,), dtype=np.float64)
f32_data = np.ones((1_000_000,), dtype=np.float32)
@njit
def double(arr):
result = np.empty(arr.shape, dtype=arr.dtype)
# Should auto-vectorize to SIMD;
for i in range(len(arr)):
result[i] = 2 * arr[i]
return result
double(f64_data)
double(f32_data)
with_f64 = sum(measure_until_full_read(simd_f64, double, f64_data))
assert with_f64 > (1_000_000 / 8) * 0.5
with_f32 = sum(measure_until_full_read(simd_f64, double, f32_data))
assert with_f32 < 100
def spin_and_do_nested_measurements_n_times(max_count: int, counter: list):
"""
For first three calls, do nested perf measurement on same core. On fourth
and later, just spin.
"""
def spin():
for _ in range(1_000_000):
pass
if len(counter) > max_count:
spin()
else:
counter.append(None)
measurer = Measure(
[
Cache(CacheId.LL, CacheOp.READ, CacheResult.ACCESS),
Cache(CacheId.LL, CacheOp.READ, CacheResult.MISS),
Cache(CacheId.L1D, CacheOp.READ, CacheResult.ACCESS),
Cache(CacheId.L1D, CacheOp.READ, CacheResult.MISS),
Cache(CacheId.LL, CacheOp.WRITE, CacheResult.ACCESS),
Cache(CacheId.L1D, CacheOp.WRITE, CacheResult.ACCESS),
],
)
measurer.enable()
spin()
measurer.disable()
return len(counter)
spin_and_do_nested_measurements_n_times.other_measurements = [
Hardware.CPU_CYCLES,
Hardware.INSTRUCTIONS,
Hardware.CACHE_REFERENCES,
Hardware.CACHE_MISSES,
Hardware.BRANCH_INSTRUCTIONS,
Hardware.BRANCH_MISSES,
]
def test_partial_read():
"""
Partial reads result in ``PartialRead`` exception.
This can happen among other reasons when there are too many measurements
requested so the measurement rotates between counters, which we take
advantage of to trigger this edge case.
"""
with pytest.raises(PartialRead) as exc:
measure(
spin_and_do_nested_measurements_n_times.other_measurements,
spin_and_do_nested_measurements_n_times,
3,
[],
)
assert exc.value.read.time_enabled_ns > exc.value.read.time_running_ns
def test_measure_until_full_read():
"""
``measure_until_full_read()`` will retry until a full read happens.
"""
# There's a limit on retries:
with pytest.raises(PartialRead):
measure_until_full_read(
spin_and_do_nested_measurements_n_times.other_measurements,
spin_and_do_nested_measurements_n_times,
30,
[],
)
time.sleep(0.5)
# But if we eventually succeed before limit, no exceptions.
results = measure_until_full_read(
spin_and_do_nested_measurements_n_times.other_measurements,
spin_and_do_nested_measurements_n_times,
3,
[],
)
# Didn't do extra measurements so didn't go over the threshold:
assert len(results) == 6