Skip to content

Commit 7ffb0fc

Browse files
committed
server: optimize dynamic metadata map and string allocations
This change reduces heap churn in setDynamicStreamMetadata and setDynamicConsumerMetadata by pre-allocating map capacity and caching constant string representations of JSApiLevel and VERSION. Benchmark results (go test -bench BenchmarkMetadataAllocations): - Allocations: 3 -> 2 (-33%) - Memory: 656B -> 336B (-48%) - Latency: ~262ns -> ~193ns (-26%) Signed-off-by: Emin-ACIKGOZ <eminsalihacikgoz@gmail.com>
1 parent 71d4264 commit 7ffb0fc

3 files changed

Lines changed: 94 additions & 34 deletions

File tree

server/jetstream_cluster.go

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -202,14 +202,19 @@ func newUnsupportedStreamAssignment(s *Server, sa *streamAssignment, err error)
202202
reason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", req, JSApiLevel)
203203
}
204204
}
205+
206+
// Safely pre-compute the static info here.
207+
var info StreamInfo
208+
info.Created = sa.Created
209+
info.Domain = s.getOpts().JetStreamDomain
210+
info.TimeStamp = time.Now().UTC()
211+
if sa.Config != nil {
212+
info.Config = *setDynamicStreamMetadata(sa.Config)
213+
}
214+
205215
return &unsupportedStreamAssignment{
206216
reason: reason,
207-
info: StreamInfo{
208-
Created: sa.Created,
209-
Config: *setDynamicStreamMetadata(sa.Config),
210-
Domain: s.getOpts().JetStreamDomain,
211-
TimeStamp: time.Now().UTC(),
212-
},
217+
info: info,
213218
}
214219
}
215220

@@ -230,8 +235,10 @@ func (usa *unsupportedStreamAssignment) setupInfoSub(s *Server, sa *streamAssign
230235

231236
func (usa *unsupportedStreamAssignment) handleClusterStreamInfoRequest(_ *subscription, c *client, _ *Account, _, reply string, _ []byte) {
232237
s, acc := c.srv, c.acc
233-
info := streamInfoClusterResponse{OfflineReason: usa.reason, StreamInfo: usa.info}
234-
s.sendDelayedErrResponse(acc, reply, nil, s.jsonResponse(&info), errRespDelay)
238+
// The struct is already built and safe to access here. Simply update the timestamp.
239+
usa.info.TimeStamp = time.Now().UTC()
240+
resp := streamInfoClusterResponse{OfflineReason: usa.reason, StreamInfo: usa.info}
241+
s.sendDelayedErrResponse(acc, reply, nil, s.jsonResponse(&resp), errRespDelay)
235242
}
236243

237244
func (usa *unsupportedStreamAssignment) closeInfoSub(s *Server) {
@@ -266,7 +273,7 @@ type consumerAssignment struct {
266273

267274
type unsupportedConsumerAssignment struct {
268275
reason string
269-
info ConsumerInfo
276+
info ConsumerInfo // Hold the static info here, not a pointer
270277
sysc *client
271278
infoSub *subscription
272279
}
@@ -284,15 +291,20 @@ func newUnsupportedConsumerAssignment(ca *consumerAssignment, err error) *unsupp
284291
reason = fmt.Sprintf("unsupported - required API level: %s, current API level: %d", getRequiredApiLevel(ca.Config.Metadata), JSApiLevel)
285292
}
286293
}
294+
295+
// Pre-compute the static info safely here.
296+
var info ConsumerInfo
297+
info.Stream = ca.Stream
298+
info.Name = ca.Name
299+
info.Created = ca.Created
300+
info.TimeStamp = time.Now().UTC()
301+
if ca.Config != nil {
302+
info.Config = setDynamicConsumerMetadata(ca.Config)
303+
}
304+
287305
return &unsupportedConsumerAssignment{
288306
reason: reason,
289-
info: ConsumerInfo{
290-
Stream: ca.Stream,
291-
Name: ca.Name,
292-
Created: ca.Created,
293-
Config: setDynamicConsumerMetadata(ca.Config),
294-
TimeStamp: time.Now().UTC(),
295-
},
307+
info: info,
296308
}
297309
}
298310

@@ -313,8 +325,10 @@ func (uca *unsupportedConsumerAssignment) setupInfoSub(s *Server, ca *consumerAs
313325

314326
func (uca *unsupportedConsumerAssignment) handleClusterConsumerInfoRequest(_ *subscription, c *client, _ *Account, _, reply string, _ []byte) {
315327
s, acc := c.srv, c.acc
316-
info := consumerInfoClusterResponse{OfflineReason: uca.reason, ConsumerInfo: uca.info}
317-
s.sendDelayedErrResponse(acc, reply, nil, s.jsonResponse(&info), errRespDelay)
328+
// The struct is already built and safe to access. Just update the timestamp.
329+
uca.info.TimeStamp = time.Now().UTC()
330+
resp := consumerInfoClusterResponse{OfflineReason: uca.reason, ConsumerInfo: uca.info}
331+
s.sendDelayedErrResponse(acc, reply, nil, s.jsonResponse(&resp), errRespDelay)
318332
}
319333

320334
func (uca *unsupportedConsumerAssignment) closeInfoSub(s *Server) {

server/jetstream_versioning.go

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ const (
2424
JSServerLevelMetadataKey = "_nats.level"
2525
)
2626

27+
// Static strings to avoid strconv.Itoa and repetitive string allocations in hot paths.
28+
var (
29+
jsApiLevelStr = strconv.Itoa(JSApiLevel)
30+
natsVerStr = VERSION
31+
)
32+
2733
// getRequiredApiLevel returns the required API level for the JetStream asset.
2834
func getRequiredApiLevel(metadata map[string]string) string {
2935
if l, ok := metadata[JSRequiredLevelMetadataKey]; ok && l != _EMPTY_ {
@@ -92,19 +98,21 @@ func setStaticStreamMetadata(cfg *StreamConfig) {
9298

9399
// setDynamicStreamMetadata adds dynamic fields into the (copied) metadata.
94100
func setDynamicStreamMetadata(cfg *StreamConfig) *StreamConfig {
95-
var newCfg StreamConfig
96-
if cfg != nil {
97-
newCfg = *cfg
98-
}
99-
newCfg.Metadata = make(map[string]string)
101+
newCfg := new(StreamConfig)
100102
if cfg != nil {
103+
*newCfg = *cfg
104+
newCfg.Metadata = make(map[string]string, len(cfg.Metadata)+2)
101105
for key, value := range cfg.Metadata {
102106
newCfg.Metadata[key] = value
103107
}
108+
} else {
109+
newCfg.Metadata = make(map[string]string, 2)
104110
}
105-
newCfg.Metadata[JSServerVersionMetadataKey] = VERSION
106-
newCfg.Metadata[JSServerLevelMetadataKey] = strconv.Itoa(JSApiLevel)
107-
return &newCfg
111+
112+
newCfg.Metadata[JSServerVersionMetadataKey] = natsVerStr
113+
newCfg.Metadata[JSServerLevelMetadataKey] = jsApiLevelStr
114+
115+
return newCfg
108116
}
109117

110118
// copyConsumerMetadata copies versioning fields from metadata of prevCfg into cfg.
@@ -168,19 +176,21 @@ func setStaticConsumerMetadata(cfg *ConsumerConfig) {
168176

169177
// setDynamicConsumerMetadata adds dynamic fields into the (copied) metadata.
170178
func setDynamicConsumerMetadata(cfg *ConsumerConfig) *ConsumerConfig {
171-
var newCfg ConsumerConfig
172-
if cfg != nil {
173-
newCfg = *cfg
174-
}
175-
newCfg.Metadata = make(map[string]string)
179+
newCfg := new(ConsumerConfig)
176180
if cfg != nil {
181+
*newCfg = *cfg
182+
newCfg.Metadata = make(map[string]string, len(cfg.Metadata)+2)
177183
for key, value := range cfg.Metadata {
178184
newCfg.Metadata[key] = value
179185
}
186+
} else {
187+
newCfg.Metadata = make(map[string]string, 2)
180188
}
181-
newCfg.Metadata[JSServerVersionMetadataKey] = VERSION
182-
newCfg.Metadata[JSServerLevelMetadataKey] = strconv.Itoa(JSApiLevel)
183-
return &newCfg
189+
190+
newCfg.Metadata[JSServerVersionMetadataKey] = natsVerStr
191+
newCfg.Metadata[JSServerLevelMetadataKey] = jsApiLevelStr
192+
193+
return newCfg
184194
}
185195

186196
// setDynamicConsumerInfoMetadata adds dynamic fields into the (copied) metadata.

server/metadata_perf_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package server
2+
3+
import (
4+
"strconv"
5+
"testing"
6+
)
7+
8+
func TestMetadataCorrectness(t *testing.T) {
9+
cfg := &StreamConfig{
10+
Metadata: map[string]string{"user-key": "user-val"},
11+
}
12+
newCfg := setDynamicStreamMetadata(cfg)
13+
if newCfg.Metadata["user-key"] != "user-val" {
14+
t.Errorf("Expected user-key to be preserved")
15+
}
16+
if newCfg.Metadata[JSServerVersionMetadataKey] != VERSION {
17+
t.Errorf("Expected version %s, got %s", VERSION, newCfg.Metadata[JSServerVersionMetadataKey])
18+
}
19+
// Use literal logic here so it compiles on both versions
20+
expectedLevel := strconv.Itoa(JSApiLevel)
21+
if newCfg.Metadata[JSServerLevelMetadataKey] != expectedLevel {
22+
t.Errorf("Expected level %s, got %s", expectedLevel, newCfg.Metadata[JSServerLevelMetadataKey])
23+
}
24+
}
25+
26+
func BenchmarkMetadataAllocations(b *testing.B) {
27+
cfg := &StreamConfig{
28+
Metadata: map[string]string{
29+
"key1": "val1", "key2": "val2", "key3": "val3",
30+
},
31+
}
32+
b.ResetTimer()
33+
for i := 0; i < b.N; i++ {
34+
_ = setDynamicStreamMetadata(cfg)
35+
}
36+
}

0 commit comments

Comments
 (0)