Skip to content

🔥 feat: Add SSE (Server-Sent Events) middleware#4196

Open
vinod-morya wants to merge 1 commit intogofiber:mainfrom
vinod-morya:feat/middleware-sse
Open

🔥 feat: Add SSE (Server-Sent Events) middleware#4196
vinod-morya wants to merge 1 commit intogofiber:mainfrom
vinod-morya:feat/middleware-sse

Conversation

@vinod-morya
Copy link
Copy Markdown

Summary

Adds middleware/sse — production-grade Server-Sent Events built natively for Fiber's fasthttp architecture. This is the only Go SSE implementation with proper client disconnect detection on fasthttp (via SendStreamWriter + w.Flush() error).

Follows the discussion in #4194 where @ReneWerner87 and @gaby agreed this belongs in Fiber core as middleware/sse.

Based on fibersse which is already in production at PersonaCart (300+ concurrent connections, multi-tenant SaaS) and listed on awesome-go.

Usage

handler, hub := sse.NewWithHub(sse.Config{
    OnConnect: func(c fiber.Ctx, conn *sse.Connection) error {
        conn.Topics = []string{"notifications"}
        conn.Metadata["tenant_id"] = c.Locals("tenant_id").(string)
        return nil
    },
})
app.Get("/events", handler)

// From any handler or worker — replaces polling
hub.Invalidate("orders", order.ID, "created")

Features

Feature Description
Hub pattern Single-goroutine event loop, topic-based routing
3 priority lanes Instant (P0), Batched (P1), Coalesced/last-writer-wins (P2)
NATS-style wildcards notifications.*, analytics.>
Adaptive throttling Slow clients get longer flush intervals automatically
Connection groups Publish by metadata (tenant_id, user_id)
Cache invalidation Invalidate(), Signal() — one-line polling replacement
Domain events DomainEvent(), Progress(), Complete(), BatchDomainEvents()
Auth helpers JWT + one-time ticket auth (EventSource can't send headers)
Metrics JSON + Prometheus endpoints
Last-Event-ID replay Pluggable Replayer interface with in-memory default
Fan-out Bridge Redis/NATS pub/sub to SSE
Graceful drain Kubernetes-style shutdown with Retry-After

Follows Fiber Conventions

  • New(config ...Config) fiber.Handler + NewWithHub(config ...Config) (fiber.Handler, *Hub) — same pattern as session's NewWithStore()
  • Config struct with Next field, ConfigDefault, configDefault() helper
  • Uses github.com/gofiber/fiber/v3/log (not log/slog)
  • Zero external dependencies — only uses packages already in Fiber's go.mod
  • golangci-lint passes clean (0 issues) against Fiber's .golangci.yml

Benchmarks (Apple M4 Max)

Benchmark_SSE_Publish-16             321,270    3,780 ns/op    139 B/op    3 allocs/op
Benchmark_SSE_TopicMatch-16        5,274,810      229 ns/op     64 B/op    2 allocs/op
Benchmark_SSE_TopicMatch_Exact-16 14,051,781       89 ns/op      0 B/op    0 allocs/op
Benchmark_SSE_MarshalEvent-16        253,378    4,767 ns/op    345 B/op   10 allocs/op
Benchmark_SSE_GenerateID-16        1,991,384      606 ns/op     32 B/op    1 allocs/op

Quality

  • 50 tests + 7 benchmarks + 4 runnable examples
  • 70.2% coverage (remaining 30% is SSE streaming internals requiring real HTTP client)
  • go test -race passes clean
  • golangci-lint passes clean (0 issues)
  • go vet clean

File Structure

middleware/sse/
├── config.go          Config + ConfigDefault + configDefault()
├── sse.go             New(), NewWithHub(), Hub, run loop
├── connection.go      Per-client connection, write loop
├── event.go           Event types, SSE wire format, marshaling
├── coalescer.go       P1/P2 event buffering
├── topic.go           NATS-style wildcard matching
├── throttle.go        Adaptive per-connection flush interval
├── auth.go            JWT + ticket auth helpers
├── invalidation.go    Cache invalidation helpers
├── domain_event.go    Domain event, progress, batch helpers
├── fanout.go          Redis/NATS pub/sub bridge
├── replayer.go        Last-Event-ID replay (pluggable)
├── metrics.go         Prometheus + JSON metrics
├── stats.go           Hub statistics
├── sse_test.go        50 tests + 7 benchmarks
├── example_test.go    4 runnable examples
└── README.md          Fiber-format documentation

cc @ReneWerner87 @gaby @efectn

@vinod-morya vinod-morya requested a review from a team as a code owner April 7, 2026 08:57
@welcome
Copy link
Copy Markdown

welcome bot commented Apr 7, 2026

Thanks for opening this pull request! 🎉 Please check out our contributing guidelines. If you need help or want to chat with us, join us on Discord https://gofiber.io/discord

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 7, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds a new Fiber SSE middleware package implementing an event Hub, Connection model, auth (JWT & one‑time tickets), event priorities with coalescing/batching, replay store, external fan‑out, metrics/Prometheus handlers, domain/invalidation helpers, tests, examples, and README documentation.

Changes

Cohort / File(s) Summary
Core SSE & Connection
middleware/sse/sse.go, middleware/sse/connection.go
New Fiber handler constructors (New, NewWithHub), Hub broker, connection lifecycle, write/heartbeat loop, backpressure, graceful shutdown, Publish/SetPaused/Shutdown/Stats APIs.
Event Model & Wire Format
middleware/sse/event.go
Adds Event/MarshaledEvent, Priority enum, global event ID generator, data marshaling, SSE serialization (WriteTo), and comment/retry helpers.
Buffering & Coalescing
middleware/sse/coalescer.go
Per-connection coalescer supporting batched and keyed coalesced buffers with deterministic flush ordering and pending counts.
Adaptive Throttling
middleware/sse/throttle.go
Unexported adaptive throttler to compute per-connection flush intervals from buffer saturation and manage last-flush state.
Configuration
middleware/sse/config.go
Config struct with lifecycle hooks/replayer and tuning fields, ConfigDefault, and normalization helper.
Topic Matching
middleware/sse/topic.go
NATS-style dot-delimited topic matching (*, >) and helpers to match a connection’s subscriptions.
Authentication & Tickets
middleware/sse/auth.go
JWTAuth OnConnect helper, TicketStore interface, MemoryTicketStore with TTL eviction and Close/finalizer, TicketAuth, and IssueTicket.
Invalidation & Domain APIs
middleware/sse/invalidation.go, middleware/sse/domain_event.go
Invalidate/Signal helpers, DomainEvent/Progress/Complete, BatchDomainEvents, tenant grouping, coalesce-keying and structured payloads.
Replay Storage
middleware/sse/replayer.go
Replayer interface and MemoryReplayer ring buffer (MaxEvents, TTL) with topic-filtered Replay and wildcard matching.
External Fan‑Out
middleware/sse/fanout.go
PubSubSubscriber interface, FanOut/FanOutMulti bridge with retry loop, transform hook, and event defaulting.
Metrics & Stats
middleware/sse/metrics.go, middleware/sse/stats.go
MetricsSnapshot/ConnectionInfo, HubStats, JSON MetricsHandler, PrometheusHandler, lifetime counters and per-topic/event aggregates.
Tests, Examples & Docs
middleware/sse/sse_test.go, middleware/sse/example_test.go, middleware/sse/README.md
Extensive tests and benchmarks, executable examples (auth, invalidation, progress, tickets), and README documenting API, config, Hub/Hub API, and examples.

Sequence Diagram(s)

sequenceDiagram
    participant Client as SSE Client
    participant Handler as Fiber Handler
    participant Replayer as Replayer
    participant Hub as Event Hub
    participant Conn as Connection

    Client->>Handler: GET /events (Last-Event-ID, topics)
    Handler->>Replayer: Replay(lastEventID, topics)
    Replayer-->>Handler: []MarshaledEvent
    Handler->>Hub: register connection (id, topics, metadata)
    Handler-->>Client: SSE headers + replayed events

    Note over Hub,Conn: Hub routes published events to matching connections
    Handler->>Hub: Publish(event)
    Hub->>Conn: trySend(marshaledEvent)
    Conn->>Conn: buffer (coalesce/batch) or drop
    Conn-->>Client: flushed SSE frames (id,event,data,retry)

    Client-->>Handler: disconnect
    Handler->>Hub: unregister connection
Loading
sequenceDiagram
    participant ExtPubSub as External Pub/Sub
    participant FanOut as FanOut Goroutine
    participant Hub as Event Hub
    participant Conn as Connection
    participant Client as SSE Client

    Handler->>FanOut: FanOut(config)
    FanOut->>ExtPubSub: Subscribe(ctx)
    ExtPubSub-->>FanOut: Payload
    FanOut->>FanOut: Transform? / build Event
    FanOut->>Hub: Publish(event)
    Hub->>Conn: deliver to matching topics
    Conn-->>Client: SSE delivery
Loading
sequenceDiagram
    participant Client as SSE Client
    participant Handler as Fiber Handler
    participant Auth as Auth OnConnect
    participant Store as TicketStore
    participant Conn as Connection

    Client->>Handler: GET /events?ticket=TK
    Handler->>Auth: TicketAuth(store, parse)
    Auth->>Store: GetDel(TK)
    Store-->>Auth: value (one-time)
    Auth->>Conn: set Metadata & Topics
    Handler->>Hub: register connection
    Handler-->>Client: SSE stream established
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related issues

  • gofiber/contrib#1771 — Matches the fibersse proposal; this PR implements a full Fiber-native SSE package addressing that request.
  • Proposal: Add SSE (Server-Sent Events) package #4194 — Implements SSE middleware (Hub, Connection, coalescing, auth, replay, fan-out, metrics) consistent with the feature discussion.

Possibly related PRs

Suggested reviewers

  • gaby
  • sixcolors
  • ReneWerner87
  • efectn

Poem

🐰 I nibble bytes and stitch a stream,

topics hop and coalesce in gleam.
Tickets, JWTs, and heartbeats hum,
Hubs deliver carrots, one by one.
Hooray — the SSE garden blooms!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 18.70% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely identifies the main change: adding SSE (Server-Sent Events) middleware to Fiber. It is specific, uses a single sentence, and accurately represents the primary purpose of the changeset.
Description check ✅ Passed The PR description is comprehensive and well-structured, covering summary, usage examples, features, adherence to Fiber conventions, benchmarks, quality metrics, and file structure. However, it does not follow the provided description template structure with explicit checkboxes for Benchmarks, Documentation Update, Changelog, Migration Guide, API Alignment, API Longevity, and Examples.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ReneWerner87 ReneWerner87 added this to v3 Apr 7, 2026
@ReneWerner87 ReneWerner87 added this to the v3 milestone Apr 7, 2026
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a native Server-Sent Events (SSE) middleware for Fiber v3, featuring a central Hub for connection management, event priorities (Instant, Batched, Coalesced), NATS-style topic wildcards, and adaptive throttling. The implementation includes built-in JWT and ticket-based authentication helpers, along with Prometheus metrics support. Feedback identifies a critical bug in the wildcard matching logic for the > token and incorrect label escaping in the Prometheus handler. Additionally, there are documentation discrepancies regarding MaxLifetime defaults and non-existent configuration fields. Suggestions for improvement include making the JWT prefix check case-insensitive, preventing potential goroutine leaks in the ticket store, optimizing the memory replayer's ring buffer, and refining the event writing logic to remove redundancy and improve type safety for control signals.

Comment on lines +20 to +44
func topicMatch(pattern, topic string) bool {
if !strings.ContainsAny(pattern, "*>") {
return pattern == topic
}

patParts := strings.Split(pattern, ".")
topParts := strings.Split(topic, ".")

for i, pp := range patParts {
switch pp {
case ">":
return i < len(topParts)
case "*":
if i >= len(topParts) {
return false
}
default:
if i >= len(topParts) || pp != topParts[i] {
return false
}
}
}

return len(patParts) == len(topParts)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The topicMatch function has a critical bug in its handling of the > wildcard. The NATS specification states that > must be the last token in a subject and matches one or more trailing tokens. The current implementation incorrectly returns true if > appears anywhere in the pattern. For example, topicMatch("a.>.c", "a.b.c") will incorrectly return true.

This logic needs to be revised to correctly handle the > wildcard, ensuring it only matches when it is the final token in the pattern.

@vinod-morya
Copy link
Copy Markdown
Author

Addressed all review feedback in c39263a:

Feedback Fix
Critical: > wildcard bug (a.>.c matching a.b.c) Fixed — > now enforced as last token only. Added regression tests.
Prometheus label escaping (escaping entire string) Fixed — escapePromLabelValue() now escapes only the value part.
README: Logger field doesn't exist Removed from config table. Uses fiber/v3/log directly.
README: MaxLifetime says 0=unlimited Fixed — docs now say -1 for unlimited (matches code).
Bearer prefix case-sensitive Fixed — now uses strings.EqualFold per RFC 6750.
WriteTo redundant empty data branch Simplified — strings.Split("", "\n") returns [""] which handles it.
TicketAuth README example incomplete Added working json.Unmarshal example.

golangci-lint passes clean (0 issues), go test -race passes, go vet clean.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 15

🧹 Nitpick comments (2)
middleware/sse/coalescer.go (1)

26-32: Consider initializing batched slice to reduce allocations.

The batched slice is left as nil, causing the first append to allocate. Pre-allocating with a reasonable initial capacity could reduce early allocations for active connections.

Optional: Pre-allocate batched slice
 func newCoalescer(flushInterval time.Duration) *coalescer {
 	return &coalescer{
 		coalesced:     make(map[string]MarshaledEvent),
+		batched:       make([]MarshaledEvent, 0, 16),
 		flushInterval: flushInterval,
 	}
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/sse/coalescer.go` around lines 26 - 32, The coalescer constructor
newCoalescer leaves the batched slice nil which causes a heap allocation on the
first append; update newCoalescer to initialize the coalescer.batched field with
a small pre-allocated capacity (e.g., make([]MarshaledEvent, 0, 8) or another
reasonable initial capacity) so early appends avoid allocations while leaving
length zero; locate the coalescer struct instantiation in newCoalescer and set
the batched field accordingly.
middleware/sse/topic.go (1)

20-44: Pattern validation: > wildcard not enforced as last token.

The docstring states > must be the last token, but the implementation doesn't validate this. A malformed pattern like "a.>.b" would match "a.x" and return true early, ignoring the trailing .b. Since patterns are developer-controlled, this is unlikely to cause issues in practice, but adding validation or a doc note about undefined behavior for invalid patterns would improve robustness.

Optional: Add pattern validation
 func topicMatch(pattern, topic string) bool {
 	if !strings.ContainsAny(pattern, "*>") {
 		return pattern == topic
 	}

 	patParts := strings.Split(pattern, ".")
 	topParts := strings.Split(topic, ".")

+	// Validate that ">" only appears as the last token
+	for i, pp := range patParts {
+		if pp == ">" && i != len(patParts)-1 {
+			return false // Invalid pattern: ">" must be last
+		}
+	}
+
 	for i, pp := range patParts {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/sse/topic.go` around lines 20 - 44, The topicMatch function
currently allows the '>' wildcard anywhere; modify topicMatch to validate that
if a patParts element equals ">" it must be the final token—if ">" appears at an
index other than len(patParts)-1, return false (treat as invalid pattern) before
proceeding to matching; use the existing patParts/topParts variables and keep
the rest of the matching logic unchanged so malformed patterns like "a.>.b" no
longer match.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@middleware/sse/domain_event.go`:
- Around line 109-131: BatchDomainEvents builds a single Event with multiple
Topics and the full specs payload, which causes subscribers of one topic to
receive unrelated specs; update BatchDomainEvents to split or filter the payload
per audience by creating one Event per topic (or per audience group) instead of
one multi-topic Event: iterate topics (from topicSet) and for each topic build
an Event with Topics set to a single-topic slice and Data set to only the specs
whose Resource matches that topic (include Group when tenantID != ""), then call
h.Publish for each per-topic Event; keep the Event struct, BatchDomainEvents and
h.Publish usage to locate and implement this change.

In `@middleware/sse/event.go`:
- Around line 61-97: marshalEvent is writing an error string directly into JSON
(in both the json.Marshaler branch and the default branch) which can produce
invalid JSON if err.Error() contains quotes or backslashes; instead build the
error payload using the JSON encoder (e.g., json.Marshal on a struct or map like
map[string]string{"error": err.Error()}) and assign the resulting string to
me.Data so the error message is properly escaped and valid JSON. Ensure you
replace the two occurrences where me.Data is set with
fmt.Sprintf(`{"error":"marshal failed: %s"}`, err) to a safe json.Marshal-based
construction and keep other behavior (ID generation, types) unchanged.

In `@middleware/sse/fanout.go`:
- Around line 110-112: The current assignment in fanout.go (if event.Priority ==
0 && cfg.Priority != 0 { event.Priority = cfg.Priority }) overwrites an explicit
intent by Transform to set PriorityInstant (0) because 0 is used for both
"instant" and "unset"; change the representation so "unset" is distinguishable
from the explicit zero—either make event.Priority a pointer/optional (e.g.,
*Priority) or add a boolean like PrioritySet on the event and update Transform
to set PrioritySet when it explicitly chooses PriorityInstant; then change the
merge logic in the fanout code to only apply cfg.Priority when PrioritySet is
false (or event.Priority == nil).

In `@middleware/sse/metrics.go`:
- Around line 139-145: The Prometheus label building currently passes the full
fragment like `topic="`+topic+`"` into appendProm, causing the label quotes to
be escaped when topic/event type contains special chars; instead, escape only
the label value before concatenating the quotes. Update the loops that call
appendProm (e.g., the one iterating snap.ConnectionsByTopic using variable topic
and the one over snap.EventsByType using eventType) to first call the existing
or a new helper to escape Prometheus label values (escape backslashes, double
quotes, and newlines), then build the label string as `topic="` + escapedTopic +
`"` and `type="` + escapedEventType + `"` and pass that to appendProm; make the
same change for the other occurrences flagged (around the 151-155 and 158-185
ranges).

In `@middleware/sse/README.md`:
- Around line 126-132: The example in the README uses sse.TicketAuth in the
sse.NewWithHub(sse.Config{ OnConnect: sse.TicketAuth(store, func(value string)
(map[string]string, []string, error) { return metadata, topics, nil }), })
snippet but returns undefined metadata and topics; update the example inside the
OnConnect callback (used with TicketAuth) to either (a) show a concrete parsing
implementation that extracts metadata (map[string]string) and topics ([]string)
from the stored string `value` and returns them, or (b) replace the return with
a clear comment like `// parse value into metadata and topics then return` so
readers know this is pseudocode; mention the relevant symbols TicketAuth,
OnConnect, and sse.NewWithHub in the change so the fix is applied to that
handler example.

In `@middleware/sse/replayer.go`:
- Around line 10-15: The Replay implementation currently loses group/tenant
context because the Replayer interface only persists topics; update the contract
to include group/tenant context (or exclude grouped events) so tenant-scoped
events cannot be replayed across tenants. Specifically, modify the Replayer
interface (methods Store and Replay) and the persisted MarshaledEvent to include
a Group/Tenant identifier (e.g., add groupID string parameter to Store and
require groupID in Replay or embed groupID in MarshaledEvent), then update all
implementations to persist and filter by that groupID during replay;
alternatively, if you prefer exclusion, change Store to skip persisting events
where MarshaledEvent.Group is set and document the behavior. Ensure you update
both Store(event MarshaledEvent, topics []string) and Replay(lastEventID string,
topics []string) signatures/usages (or their equivalents) so replay filtering
uses group/tenant context.
- Around line 97-100: Remove the unused local variable topicSet in the Replay
function: delete the topicSet creation block (the make(map[string]struct{},
len(topics)) and the for loop that populates it) so the code uses the existing
topics slice directly when calling matchesAnyTopicWithWildcards; ensure no other
references to topicSet remain in Replay.

In `@middleware/sse/sse_test.go`:
- Around line 386-390: The test is using a fixed time.Sleep after hub.Publish
which flakes under load; replace the hard-coded 50ms wait with an
eventual/polling assertion that repeatedly checks hub.Stats() until
EventsPublished becomes 1 (or a timeout occurs). Locate the test snippet around
hub.Publish(Event{...}) and the require.Equal(t, int64(1),
stats.EventsPublished) and implement a retry loop or use a testing helper (e.g.,
testify.Eventually or a small for/select with time.After) to poll hub.Stats()
until the condition is true, failing the test if the timeout elapses; remove the
time.Sleep call.
- Around line 632-643: The test currently treats a client-side timeout on
app.Test as evidence the SSE stream opened; instead modify the test around the
app.Test call (the block that creates req via http.NewRequest and calls
app.Test) to wait for an explicit server-side success signal—either hook into
the SSE hub's connection callback (use an OnConnect notification) or assert
hub.Stats().ActiveConnections == 1—before relying on the client timeout; apply
the same change to both similar blocks in this file (the one around the req with
"Authorization: Bearer my-jwt" and the other at 669-678) so the test only
considers the stream opened after the explicit confirmation is observed.

In `@middleware/sse/sse.go`:
- Around line 139-156: The connection is being registered (hub.register <- conn)
before initStream finishes, allowing live events to be queued and causing
duplicate deliveries; change the flow so the connection is not sent to
hub.register until after initStream has completed its replay (i.e., call
hub.register only after initStream returns successfully), keep capturing
lastEventID as-is, and retain the defer that sends to hub.unregister and closes
conn so cleanup still runs if initStream fails.
- Around line 445-457: The marshaled event loses TTL/CreatedAt so expiry is only
checked pre-enqueue (in routeEvent) and not on delivery; fix by preserving
expiry info on the buffered representation and re-checking it before
flush/delivery: add CreatedAt and TTL fields to MarshaledEvent (or an expiry
timestamp), populate them in marshalEvent when called from routeEvent, and in
flushAll (and the other delivery paths around lines 529-579) re-evaluate
time.Since(CreatedAt) > TTL and drop expired items (increment
h.metrics.eventsDropped) instead of sending them.
- Around line 214-217: Make Shutdown idempotent by ensuring the shutdown channel
is closed only once: add a sync.Once field (e.g., shutdownOnce) to the Hub
struct and change Hub.Shutdown to call h.shutdownOnce.Do(func() {
h.draining.Store(true); close(h.shutdown) }); keep the draining.Store(true)
inside the Do to handle concurrent callers and return the same error semantics
(nil) after Do completes. Use the existing symbols Hub, Shutdown, h.shutdown and
h.draining and introduce shutdownOnce to coordinate single close.
- Around line 138-153: The hub register/unregister sends are currently
fire-and-forget and can race with hub shutdown; change the protocol so
registration is acknowledged and unregistration is safe: have hub.register
accept a request struct (e.g., containing conn and an ack channel) and block
until the hub confirms registration (so SendStreamWriter only proceeds once
run() has processed register), record a boolean like "registered" and only send
to hub.unregister if registered, and send unregister using a non-blocking select
that also checks hub shutdown/Done so the defer won't block if the hub has
exited; update the registration/unregistration call sites in SendStreamWriter
(the conn registration block and the deferred cleanup) and modify run() to read
the new request struct and send the ack.
- Around line 477-526: The current
matchConnections/matchWildcardConns/matchGroupConns logic ORs topic and group
selectors; instead implement intersection when both event.Topics and event.Group
are present: in matchConnections, if both selectors exist first collect
topic-matching connection IDs (including wildcard matches via matchWildcardConns
but limited to topic semantics), then filter that set by connMatchesGroup (or
vice versa); if only one selector is present keep the existing behavior. Update
how matchWildcardConns and matchGroupConns are used (or refactor into helpers)
so wildcardConns are considered as topic matches and group filtering is applied
only when event.Group is set, using the existing symbols connMatchesTopic,
connMatchesGroup, topicIndex, wildcardConns, connections, Event.Topics and
Event.Group.
- Around line 178-189: The Publish method's send on h.events can block when the
channel is full, contradicting the comment; change the send to a non-blocking
select that either sends to h.events and increments h.metrics.eventsPublished,
or on a default/drop branch increments a new or existing drop metric (e.g.,
h.metrics.eventsDropped.Add(1)) so callers won't stall under backpressure;
preserve the existing shutdown case (<-h.shutdown) in the select and update the
method comment to reflect the non-blocking/drop behavior; locate this logic in
Hub.Publish and update metrics usage accordingly.

---

Nitpick comments:
In `@middleware/sse/coalescer.go`:
- Around line 26-32: The coalescer constructor newCoalescer leaves the batched
slice nil which causes a heap allocation on the first append; update
newCoalescer to initialize the coalescer.batched field with a small
pre-allocated capacity (e.g., make([]MarshaledEvent, 0, 8) or another reasonable
initial capacity) so early appends avoid allocations while leaving length zero;
locate the coalescer struct instantiation in newCoalescer and set the batched
field accordingly.

In `@middleware/sse/topic.go`:
- Around line 20-44: The topicMatch function currently allows the '>' wildcard
anywhere; modify topicMatch to validate that if a patParts element equals ">" it
must be the final token—if ">" appears at an index other than len(patParts)-1,
return false (treat as invalid pattern) before proceeding to matching; use the
existing patParts/topParts variables and keep the rest of the matching logic
unchanged so malformed patterns like "a.>.b" no longer match.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 49cd8174-14c1-4a7d-8fd5-1d16aaa19b94

📥 Commits

Reviewing files that changed from the base of the PR and between 676b2f7 and f1d3a7c.

📒 Files selected for processing (17)
  • middleware/sse/README.md
  • middleware/sse/auth.go
  • middleware/sse/coalescer.go
  • middleware/sse/config.go
  • middleware/sse/connection.go
  • middleware/sse/domain_event.go
  • middleware/sse/event.go
  • middleware/sse/example_test.go
  • middleware/sse/fanout.go
  • middleware/sse/invalidation.go
  • middleware/sse/metrics.go
  • middleware/sse/replayer.go
  • middleware/sse/sse.go
  • middleware/sse/sse_test.go
  • middleware/sse/stats.go
  • middleware/sse/throttle.go
  • middleware/sse/topic.go

Comment on lines +109 to +131
// BatchDomainEvents publishes multiple domain events as a single SSE frame.
func (h *Hub) BatchDomainEvents(tenantID string, specs []DomainEventSpec) {
if len(specs) == 0 {
return
}
topicSet := make(map[string]struct{})
for _, s := range specs {
topicSet[s.Resource] = struct{}{}
}
topics := make([]string, 0, len(topicSet))
for t := range topicSet {
topics = append(topics, t)
}
batchEvt := Event{
Type: "batch",
Topics: topics,
Data: specs,
Priority: PriorityInstant,
}
if tenantID != "" {
batchEvt.Group = map[string]string{"tenant_id": tenantID}
}
h.Publish(batchEvt)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Mixed-topic batches leak unrelated specs to subscribers.

middleware/sse/sse.go delivers a multi-topic Event once to any connection that matches any listed topic. A client subscribed only to orders will still receive the full specs slice if the batch also includes customers. Split batches by audience or filter the payload per connection before marshaling.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/sse/domain_event.go` around lines 109 - 131, BatchDomainEvents
builds a single Event with multiple Topics and the full specs payload, which
causes subscribers of one topic to receive unrelated specs; update
BatchDomainEvents to split or filter the payload per audience by creating one
Event per topic (or per audience group) instead of one multi-topic Event:
iterate topics (from topicSet) and for each topic build an Event with Topics set
to a single-topic slice and Data set to only the specs whose Resource matches
that topic (include Group when tenantID != ""), then call h.Publish for each
per-topic Event; keep the Event struct, BatchDomainEvents and h.Publish usage to
locate and implement this change.

@vinod-morya
Copy link
Copy Markdown
Author

Addressed remaining feedback in 64966ef:

Feedback Fix
Heartbeat magic string Replaced with type-safe heartbeatSignal struct. Send channel is now chan any with type switch in writeLoop — no more sentinel strings.
MemoryReplayer ring buffer Replaced slice append+reslice with fixed-size circular buffer. Store() is now O(1) with zero allocations once the buffer is full.
MemoryTicketStore goroutine leak Added runtime.SetFinalizer to auto-close if caller forgets Close().

Re: Invalidate overlap with DomainEvent* — These are intentionally separate APIs for different use cases:

  • Invalidate("orders", "ord_1", "created") — 3 params, simple cache invalidation signal
  • DomainEvent("orders", "created", "ord_1", "t_1", hint) — 5 params, tenant-scoped with hints

Invalidate* is the 80% use case (simple, no tenant, no hints). DomainEvent is the full-featured version. Removing Invalidate* would force every simple call to pass empty strings for tenantID and nil for hints. Happy to consolidate if the team prefers — just let me know.

All checks: golangci-lint 0 issues, go test -race passes, go vet clean.

@vinod-morya
Copy link
Copy Markdown
Author

Addressed all CodeRabbit critical and major findings in 8be8414:

Critical

Finding Fix
Topic+Group OR leaks across tenants AND semantics when both set — connections must match topic AND group
Replay drops tenant filter Group-scoped events excluded from replayer storage
Register/unregister blocks after shutdown Channel sends wrapped with select + hub.shutdown

Major

Finding Fix
Shutdown double-close panic sync.Once wraps close(h.shutdown)
Publish blocks under backpressure Added default drop case, updated doc
TTL lost in coalescer MarshaledEvent now carries CreatedAt+TTL, re-checked in flushAll()
Register before replay = duplicates Moved register to AFTER initStream completes
Marshal error invalid JSON json.Marshal(err.Error()) for proper escaping
Batch cross-topic delivery Documented as intentional (dashboard pattern)

Build, go test -race, golangci-lint — all clean.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
middleware/sse/domain_event.go (1)

20-26: CoalesceKey is ineffective with PriorityInstant.

Per sse.go:565-579 (context snippet 2), CoalesceKey is only consulted for PriorityCoalesced events. Since this event uses PriorityInstant, the key at line 25 is never used—the event is dispatched immediately via trySend.

Either remove the dead field assignment, or if coalescing is desired for rapid duplicate invalidations, switch to PriorityCoalesced.

Option 1: Remove unused CoalesceKey
 	event := Event{
 		Type:        "invalidate",
 		Topics:      []string{resource},
 		Data:        evt,
 		Priority:    PriorityInstant,
-		CoalesceKey: "invalidate:" + resource + ":" + resourceID,
 	}
Option 2: If coalescing is intended, use PriorityCoalesced
 	event := Event{
 		Type:        "invalidate",
 		Topics:      []string{resource},
 		Data:        evt,
-		Priority:    PriorityInstant,
+		Priority:    PriorityCoalesced,
 		CoalesceKey: "invalidate:" + resource + ":" + resourceID,
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/sse/domain_event.go` around lines 20 - 26, The Event being
constructed sets CoalesceKey but uses PriorityInstant, which never consults
CoalesceKey (trySend dispatches immediately); either remove the CoalesceKey
assignment from the Event literal to eliminate dead code, or change
PriorityInstant to PriorityCoalesced so the event will be handled by the
coalescing path; if choosing coalescing, ensure the Event (with CoalesceKey) is
enqueued/handled by the coalescer rather than sent via trySend so coalescing
actually occurs.
middleware/sse/metrics.go (1)

130-147: Consider pre-allocating the byte slice to reduce allocations.

[]byte("") is slightly wasteful. Since metrics endpoints can be polled frequently, pre-allocating with an estimated capacity would reduce allocations during repeated appends.

♻️ Suggested improvement
-		lines := []byte("")
+		// Pre-allocate ~512 bytes to reduce append reallocations
+		lines := make([]byte, 0, 512)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/sse/metrics.go` around lines 130 - 147, The byte-slice `lines` in
the metrics builder is created as `[]byte("")`, causing repeated reallocations;
pre-allocate a capacity before appends to reduce allocations by replacing that
creation with a `make([]byte, 0, N)` using a conservative estimated capacity
(e.g., 1024 or tuned by average metrics size) so subsequent calls to appendProm
and loops over snap.ConnectionsByTopic / snap.EventsByType avoid growing the
slice; update the initialization near where `lines` is declared (used with
appendProm and escapePromLabelValue) and ensure tests still pass.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@middleware/sse/domain_event.go`:
- Around line 20-26: The Event being constructed sets CoalesceKey but uses
PriorityInstant, which never consults CoalesceKey (trySend dispatches
immediately); either remove the CoalesceKey assignment from the Event literal to
eliminate dead code, or change PriorityInstant to PriorityCoalesced so the event
will be handled by the coalescing path; if choosing coalescing, ensure the Event
(with CoalesceKey) is enqueued/handled by the coalescer rather than sent via
trySend so coalescing actually occurs.

In `@middleware/sse/metrics.go`:
- Around line 130-147: The byte-slice `lines` in the metrics builder is created
as `[]byte("")`, causing repeated reallocations; pre-allocate a capacity before
appends to reduce allocations by replacing that creation with a `make([]byte, 0,
N)` using a conservative estimated capacity (e.g., 1024 or tuned by average
metrics size) so subsequent calls to appendProm and loops over
snap.ConnectionsByTopic / snap.EventsByType avoid growing the slice; update the
initialization near where `lines` is declared (used with appendProm and
escapePromLabelValue) and ensure tests still pass.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: f4c9e7c9-4741-4d13-9341-c47c2ad770a9

📥 Commits

Reviewing files that changed from the base of the PR and between f1d3a7c and 8be8414.

📒 Files selected for processing (11)
  • middleware/sse/README.md
  • middleware/sse/auth.go
  • middleware/sse/coalescer.go
  • middleware/sse/connection.go
  • middleware/sse/domain_event.go
  • middleware/sse/event.go
  • middleware/sse/metrics.go
  • middleware/sse/replayer.go
  • middleware/sse/sse.go
  • middleware/sse/sse_test.go
  • middleware/sse/topic.go
✅ Files skipped from review due to trivial changes (3)
  • middleware/sse/README.md
  • middleware/sse/coalescer.go
  • middleware/sse/sse.go
🚧 Files skipped from review as they are similar to previous changes (4)
  • middleware/sse/topic.go
  • middleware/sse/replayer.go
  • middleware/sse/connection.go
  • middleware/sse/sse_test.go

@vinod-morya
Copy link
Copy Markdown
Author

Addressed nitpick feedback in c226977:

  • Dead CoalesceKey on instant events — Removed from all Invalidate* and DomainEvent helpers. CoalesceKey is only meaningful for PriorityCoalesced; instant events bypass the coalescer entirely.
  • Pre-allocate batched slicenewCoalescer now initializes with make([]MarshaledEvent, 0, 16) to avoid first-append allocation.
  • Added Test_SSE_Shutdown_Idempotent — Calls Shutdown() twice, verifies no panic (sync.Once guard).

@vinod-morya vinod-morya force-pushed the feat/middleware-sse branch from c226977 to 3f2386e Compare April 7, 2026 11:12
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (1)
middleware/sse/fanout.go (1)

110-112: ⚠️ Potential issue | 🟡 Minor

Transform still cannot intentionally keep PriorityInstant.

PriorityInstant and “unset” are both 0, so this branch still overwrites an explicit instant priority whenever cfg.Priority != 0. That means config defaults are not fully overrideable.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/sse/fanout.go` around lines 110 - 112, Transform currently
conflate PriorityInstant (0) with "unset", so change the representation to
distinguish unset from explicit instant: introduce a sentinel constant
PriorityUnset (e.g., -1) and ensure new events are initialized with
PriorityUnset instead of 0; then update the branch in Transform (the code
checking event.Priority and cfg.Priority) to only apply defaults when
event.Priority == PriorityUnset and cfg.Priority != 0; update all places that
construct events or check priority to use PriorityUnset and preserve explicit
PriorityInstant, referencing event.Priority, cfg.Priority, PriorityInstant and
the new PriorityUnset.
🧹 Nitpick comments (2)
middleware/sse/metrics.go (1)

112-119: Redundant Content-Type header.

c.JSON() already sets Content-Type: application/json automatically. Line 116 is unnecessary.

♻️ Suggested simplification
 func (h *Hub) MetricsHandler() fiber.Handler {
 	return func(c fiber.Ctx) error {
 		includeConns := c.Query("connections") == "true"
 		snap := h.Metrics(includeConns)
-		c.Set("Content-Type", "application/json")
 		return c.JSON(snap)
 	}
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/sse/metrics.go` around lines 112 - 119, The MetricsHandler sets
the Content-Type header redundantly before calling c.JSON; remove the explicit
c.Set("Content-Type", "application/json") line from Hub.MetricsHandler so the
handler simply computes includeConns, calls h.Metrics(includeConns), and returns
c.JSON(snap), relying on c.JSON to set the header automatically.
middleware/sse/replayer.go (1)

70-76: Minor doc inaccuracy: map allocation occurs on every Store call.

The doc comment claims "zero allocations" once full, but line 73 allocates a new map on every Store call. This is necessary for efficient topic matching during Replay, but the comment is misleading.

📝 Suggested doc fix
 // Store adds an event to the replay buffer. Once full, overwrites the
-// oldest entry (O(1), zero allocations).
+// oldest entry (O(1) buffer insertion).
 func (r *MemoryReplayer) Store(event MarshaledEvent, topics []string) error {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/sse/replayer.go` around lines 70 - 76, The comment on
MemoryReplayer.Store is misleading: it states "zero allocations" once full but
the function allocates a new topicSet map on every call (see
MemoryReplayer.Store and the topicSet := make(map[string]struct{}, len(topics))
line). Update the doc comment to remove or qualify the "zero allocations"
claim—e.g., state that ring-buffer storage avoids allocations once full but a
per-call topics map is allocated for matching—or otherwise accurately describe
that topicSet is allocated each Store invocation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@middleware/sse/connection.go`:
- Around line 121-124: The connMatchesGroup function incorrectly treats missing
metadata keys as empty strings; update connMatchesGroup (which takes *Connection
and group map[string]string and reads conn.Metadata) to explicitly check key
existence before comparing values: for each k in group, get val, ok :=
conn.Metadata[k]; if !ok return false; then if val != v return false; this
ensures a requested key must exist on Connection.Metadata rather than matching
absent keys as "".
- Around line 66-115: trySend can enqueue messages after Close() because it
doesn't check the done channel and Close() only closes done; update trySend to
also select on c.done (return false immediately if <-c.done) before sending to
c.send to prevent enqueuing after shutdown, and adjust Close() to also close the
send channel (or otherwise ensure send is drained) so writeLoop won't receive
buffered messages after done is signaled; refer to the trySend method, the
writeLoop function, and the Close() implementation when making the changes.

In `@middleware/sse/domain_event.go`:
- Around line 117-129: BatchDomainEvents currently builds topics from the
caller-owned specs slice but stores that same specs slice in Event.Data before
calling Publish, which can lead to races or payload drift because Publish is
non-blocking and serialization happens later; to fix, make a deep copy of the
specs slice (and copy any mutable nested Hint/map fields if present) before
assigning it to Event.Data so the enqueued Event holds an immutable snapshot;
update the code paths around BatchDomainEvents, Event{Data: ...}, and any use of
the caller-owned specs or Hint maps to use the copied slice/structures when
calling Publish.

In `@middleware/sse/event.go`:
- Around line 112-125: The WriteTo method is writing MarshaledEvent.ID and
MarshaledEvent.Type directly into SSE control lines, allowing CR/LF to break
framing; sanitize me.ID and me.Type before serializing in WriteTo by stripping
or replacing any '\r' and '\n' characters (e.g., replace with space or remove)
so no newline characters can be injected into the "id: " or "event: " lines, and
then use the sanitized values in the fmt.Fprintf calls to preserve total byte
accounting and existing error handling.

In `@middleware/sse/fanout.go`:
- Around line 50-66: Validate cfg.Subscriber before spawning the goroutine in
Hub.FanOut: if cfg.Subscriber is nil, return a no-op cancel function (or cancel
the context immediately) and log/handle the error so the caller fails fast
instead of letting the background goroutine panic; specifically check
cfg.Subscriber prior to creating ctx/cancel or starting the go func, and avoid
calling cfg.Subscriber.Subscribe(...) when cfg.Subscriber is nil (the Subscribe
call is the unsafe call to guard).

---

Duplicate comments:
In `@middleware/sse/fanout.go`:
- Around line 110-112: Transform currently conflate PriorityInstant (0) with
"unset", so change the representation to distinguish unset from explicit
instant: introduce a sentinel constant PriorityUnset (e.g., -1) and ensure new
events are initialized with PriorityUnset instead of 0; then update the branch
in Transform (the code checking event.Priority and cfg.Priority) to only apply
defaults when event.Priority == PriorityUnset and cfg.Priority != 0; update all
places that construct events or check priority to use PriorityUnset and preserve
explicit PriorityInstant, referencing event.Priority, cfg.Priority,
PriorityInstant and the new PriorityUnset.

---

Nitpick comments:
In `@middleware/sse/metrics.go`:
- Around line 112-119: The MetricsHandler sets the Content-Type header
redundantly before calling c.JSON; remove the explicit c.Set("Content-Type",
"application/json") line from Hub.MetricsHandler so the handler simply computes
includeConns, calls h.Metrics(includeConns), and returns c.JSON(snap), relying
on c.JSON to set the header automatically.

In `@middleware/sse/replayer.go`:
- Around line 70-76: The comment on MemoryReplayer.Store is misleading: it
states "zero allocations" once full but the function allocates a new topicSet
map on every call (see MemoryReplayer.Store and the topicSet :=
make(map[string]struct{}, len(topics)) line). Update the doc comment to remove
or qualify the "zero allocations" claim—e.g., state that ring-buffer storage
avoids allocations once full but a per-call topics map is allocated for
matching—or otherwise accurately describe that topicSet is allocated each Store
invocation.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: c16b0d83-c6f9-4026-8aae-4be7f898d430

📥 Commits

Reviewing files that changed from the base of the PR and between c226977 and 3f2386e.

📒 Files selected for processing (17)
  • middleware/sse/README.md
  • middleware/sse/auth.go
  • middleware/sse/coalescer.go
  • middleware/sse/config.go
  • middleware/sse/connection.go
  • middleware/sse/domain_event.go
  • middleware/sse/event.go
  • middleware/sse/example_test.go
  • middleware/sse/fanout.go
  • middleware/sse/invalidation.go
  • middleware/sse/metrics.go
  • middleware/sse/replayer.go
  • middleware/sse/sse.go
  • middleware/sse/sse_test.go
  • middleware/sse/stats.go
  • middleware/sse/throttle.go
  • middleware/sse/topic.go
✅ Files skipped from review due to trivial changes (5)
  • middleware/sse/throttle.go
  • middleware/sse/README.md
  • middleware/sse/config.go
  • middleware/sse/sse_test.go
  • middleware/sse/sse.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • middleware/sse/example_test.go

Comment on lines +66 to +115
func (c *Connection) trySend(msg any) bool {
select {
case c.send <- msg:
return true
default:
c.MessagesDropped.Add(1)
return false
}
}

// writeLoop runs inside Fiber's SendStreamWriter. It reads from the send
// channel and writes SSE-formatted events to the bufio.Writer.
func (c *Connection) writeLoop(w *bufio.Writer) {
for {
select {
case <-c.done:
return
case msg, ok := <-c.send:
if !ok {
return
}
switch me := msg.(type) {
case heartbeatSignal:
_ = me // type-safe signal, no data
if err := writeComment(w, "heartbeat"); err != nil {
c.Close()
return
}
if err := w.Flush(); err != nil {
c.Close()
return
}
case MarshaledEvent:
if _, err := me.WriteTo(w); err != nil {
c.Close()
return
}
if err := w.Flush(); err != nil {
c.Close()
return
}
c.MessagesSent.Add(1)
c.lastWrite.Store(time.Now())
if me.ID != "" {
c.LastEventID.Store(me.ID)
}
default:
// Unknown message type, skip
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's examine the entire connection.go file to understand the full context
wc -l middleware/sse/connection.go

Repository: gofiber/fiber

Length of output: 89


🏁 Script executed:

# Let's read the full connection.go file to see all implementations
cat -n middleware/sse/connection.go

Repository: gofiber/fiber

Length of output: 3945


🏁 Script executed:

# Let's also search for the Close method and other relevant parts
rg -n "func.*Close|type Connection" middleware/sse/connection.go

Repository: gofiber/fiber

Length of output: 160


🏁 Script executed:

# Find all calls to trySend to understand the actual risk
rg -n "\.trySend\(" middleware/sse/

Repository: gofiber/fiber

Length of output: 544


🏁 Script executed:

# Also search for Close() calls to understand shutdown patterns
rg -n "\.Close\(\)" middleware/sse/

Repository: gofiber/fiber

Length of output: 744


🏁 Script executed:

# Let's examine the context around the trySend calls in sse.go
rg -B 5 -A 5 "conn\.trySend\(" middleware/sse/sse.go

Repository: gofiber/fiber

Length of output: 1215


🏁 Script executed:

# Let's also check if there are guards that check IsClosed before calling trySend
rg -B 3 "trySend" middleware/sse/sse.go | head -50

Repository: gofiber/fiber

Length of output: 640


🏁 Script executed:

# Let's look at the test case that calls trySend after Close
sed -n '520,540p' middleware/sse/sse_test.go

Repository: gofiber/fiber

Length of output: 539


🏁 Script executed:

# Let's see what the full test function is doing
sed -n '515,545p' middleware/sse/sse_test.go

Repository: gofiber/fiber

Length of output: 813


🏁 Script executed:

# Let's see how connections are managed - when are they removed from the active set?
rg -n "delete.*conn|remove.*conn" middleware/sse/sse.go -i

Repository: gofiber/fiber

Length of output: 391


🏁 Script executed:

# Let's look at where deliverToConn is called from to understand the flow
rg -B 5 "deliverToConn" middleware/sse/sse.go | head -60

Repository: gofiber/fiber

Length of output: 453


🏁 Script executed:

# Let's check the shutdown flow more carefully around line 357
sed -n '345,365p' middleware/sse/sse.go

Repository: gofiber/fiber

Length of output: 484


🏁 Script executed:

# Let's look at the removeConnection function and when it's called
sed -n '435,460p' middleware/sse/sse.go

Repository: gofiber/fiber

Length of output: 717


🏁 Script executed:

# Let's check if connections can receive messages after being marked for removal
rg -B 10 "removeConnection" middleware/sse/sse.go | head -40

Repository: gofiber/fiber

Length of output: 628


🏁 Script executed:

# Let's check the synchronization - is there a connections lock?
rg -n "h\.connections\[" middleware/sse/sse.go | head -20

Repository: gofiber/fiber

Length of output: 306


🏁 Script executed:

# Let's understand the full flow - when unregister is called, does it call Close?
rg -B 5 -A 5 "unregister.*<-" middleware/sse/sse.go

Repository: gofiber/fiber

Length of output: 364


🏁 Script executed:

# Let's see the complete listener/hub loop to understand synchronization
sed -n '375,405p' middleware/sse/sse.go

Repository: gofiber/fiber

Length of output: 624


🏁 Script executed:

# Check if there's any guard around deliverToConn to prevent closed connections
rg -B 3 "deliverToConn" middleware/sse/sse.go | grep -A 3 "IsClosed"

Repository: gofiber/fiber

Length of output: 39


🏁 Script executed:

# Let's look at the full context around the unregister case and how writeLoop interacts
sed -n '145,160p' middleware/sse/sse.go

Repository: gofiber/fiber

Length of output: 457


🏁 Script executed:

# Let's check if there's any synchronization to ensure writeLoop has drained before Close
rg -n "waitGroup|wg\.Wait|wg\.Done" middleware/sse/sse.go

Repository: gofiber/fiber

Length of output: 39


🏁 Script executed:

# Let's check the synchronization around the Send channel - is it ever closed?
rg -n "close(c\.send)" middleware/sse/

Repository: gofiber/fiber

Length of output: 39


🏁 Script executed:

# Let's look at what happens in practice - trace through a complete flow
# First, let's see if writeLoop is guaranteed to have exited before Close() returns
sed -n '135,165p' middleware/sse/sse.go

Repository: gofiber/fiber

Length of output: 888


🏁 Script executed:

# Let's trace the exact synchronization - when is writeLoop guaranteed to have exited?
# Look at what happens in the defer block more closely
sed -n '145,170p' middleware/sse/sse.go

Repository: gofiber/fiber

Length of output: 676


🏁 Script executed:

# Let's understand the race condition scenario more clearly
# Can a message be queued after Close() but before writeLoop exits?
# Check if there's ANY guard that prevents trySend after Close
sed -n '64,74p' middleware/sse/connection.go && echo "---" && sed -n '48,62p' middleware/sse/connection.go

Repository: gofiber/fiber

Length of output: 584


🏁 Script executed:

# Final check: Is the send channel ever closed?
rg -n "close\(" middleware/sse/connection.go

Repository: gofiber/fiber

Length of output: 75


Closed connections can still accept messages due to missing synchronization on close.

trySend() never checks if the connection is closed, so it can successfully enqueue messages even after Close() is called. Additionally, Close() only closes the done channel but never closes the send channel. This creates a race condition: when writeLoop receives the close signal, it may have buffered messages still in the channel that get delivered after shutdown is initiated. Add a check in trySend() to fail immediately if done is closed, ensuring no messages can be enqueued after close.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/sse/connection.go` around lines 66 - 115, trySend can enqueue
messages after Close() because it doesn't check the done channel and Close()
only closes done; update trySend to also select on c.done (return false
immediately if <-c.done) before sending to c.send to prevent enqueuing after
shutdown, and adjust Close() to also close the send channel (or otherwise ensure
send is drained) so writeLoop won't receive buffered messages after done is
signaled; refer to the trySend method, the writeLoop function, and the Close()
implementation when making the changes.

Comment on lines +121 to +124
func connMatchesGroup(conn *Connection, group map[string]string) bool {
for k, v := range group {
if conn.Metadata[k] != v {
return false
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

find . -type f -name "connection.go" | grep -E "middleware/sse"

Repository: gofiber/fiber

Length of output: 87


🏁 Script executed:

cat -n middleware/sse/connection.go | sed -n '115,135p'

Repository: gofiber/fiber

Length of output: 435


🏁 Script executed:

head -20 middleware/sse/connection.go

Repository: gofiber/fiber

Length of output: 525


🏁 Script executed:

rg "connMatchesGroup" middleware/sse/ -B 2 -A 5

Repository: gofiber/fiber

Length of output: 1256


🏁 Script executed:

rg "group.*=.*{" middleware/sse/connection.go -B 2 -A 5

Repository: gofiber/fiber

Length of output: 142


🏁 Script executed:

rg "Metadata\[" middleware/sse/ -B 1 -A 1

Repository: gofiber/fiber

Length of output: 889


🏁 Script executed:

cat -n middleware/sse/sse.go | sed -n '1,50p'

Repository: gofiber/fiber

Length of output: 1937


🏁 Script executed:

rg "gofiber/utils" middleware/sse/

Repository: gofiber/fiber

Length of output: 39


🏁 Script executed:

rg "import" middleware/sse/connection.go

Repository: gofiber/fiber

Length of output: 65


🏁 Script executed:

go doc github.com/gofiber/utils/v2

Repository: gofiber/fiber

Length of output: 2773


Require the metadata key to exist in connMatchesGroup.

A direct map lookup treats missing keys as "", so a filter like {"tenant_id": ""} matches connections that do not have tenant_id at all. This can weaken group scoping and unintentionally allow events to reach wrong recipients.

Fix
 func connMatchesGroup(conn *Connection, group map[string]string) bool {
 	for k, v := range group {
-		if conn.Metadata[k] != v {
+		got, ok := conn.Metadata[k]
+		if !ok || got != v {
 			return false
 		}
 	}
 	return true
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func connMatchesGroup(conn *Connection, group map[string]string) bool {
for k, v := range group {
if conn.Metadata[k] != v {
return false
func connMatchesGroup(conn *Connection, group map[string]string) bool {
for k, v := range group {
got, ok := conn.Metadata[k]
if !ok || got != v {
return false
}
}
return true
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/sse/connection.go` around lines 121 - 124, The connMatchesGroup
function incorrectly treats missing metadata keys as empty strings; update
connMatchesGroup (which takes *Connection and group map[string]string and reads
conn.Metadata) to explicitly check key existence before comparing values: for
each k in group, get val, ok := conn.Metadata[k]; if !ok return false; then if
val != v return false; this ensures a requested key must exist on
Connection.Metadata rather than matching absent keys as "".

Comment on lines +117 to +129
topicSet := make(map[string]struct{})
for _, s := range specs {
topicSet[s.Resource] = struct{}{}
}
topics := make([]string, 0, len(topicSet))
for t := range topicSet {
topics = append(topics, t)
}
batchEvt := Event{
Type: "batch",
Topics: topics,
Data: specs,
Priority: PriorityInstant,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's explore the file structure
fd -t f "domain_event.go"

Repository: gofiber/fiber

Length of output: 87


🏁 Script executed:

# Read the domain_event.go file to understand the full context
cat -n middleware/sse/domain_event.go

Repository: gofiber/fiber

Length of output: 4699


🏁 Script executed:

# Find the Publish method in the Hub type
rg -A 20 "func \(h \*Hub\) Publish" middleware/sse/

Repository: gofiber/fiber

Length of output: 1294


🏁 Script executed:

# Find the Event struct definition
rg -B 2 -A 10 "type Event struct" middleware/sse/

Repository: gofiber/fiber

Length of output: 665


🏁 Script executed:

# Check how the Data field is used in event processing
rg -B 5 -A 10 "event.Data" middleware/sse/

Repository: gofiber/fiber

Length of output: 39


🏁 Script executed:

# Look for where events are consumed from the channel and how Data is serialized
rg -B 5 -A 15 "h.events <-" middleware/sse/

Repository: gofiber/fiber

Length of output: 1294


🏁 Script executed:

# Search for where events are consumed from the channel and processed
rg -B 5 -A 15 "for.*h.events" middleware/sse/

Repository: gofiber/fiber

Length of output: 39


🏁 Script executed:

# Look for JSON marshaling of the Data field
rg -B 3 -A 10 "json.Marshal" middleware/sse/

Repository: gofiber/fiber

Length of output: 2121


🏁 Script executed:

# Check if there's any test that might show mutation issues
fd -t f "test" middleware/sse/

Repository: gofiber/fiber

Length of output: 114


🏁 Script executed:

# Read the event.go file to understand the MarshalEvent function
cat -n middleware/sse/event.go

Repository: gofiber/fiber

Length of output: 5548


🏁 Script executed:

# Check the sse.go file to find where MarshalEvent is called and how events flow
rg -B 5 -A 20 "MarshalEvent" middleware/sse/sse.go

Repository: gofiber/fiber

Length of output: 39


🏁 Script executed:

# Look for the event processing loop in sse.go
cat -n middleware/sse/sse.go | head -200

Repository: gofiber/fiber

Length of output: 7346


🏁 Script executed:

# Search for where the event channel is read
rg -n "case.*:=" middleware/sse/sse.go | head -20

Repository: gofiber/fiber

Length of output: 156


🏁 Script executed:

# Look at what happens when an event is received from the channel
sed -n '387,450p' middleware/sse/sse.go

Repository: gofiber/fiber

Length of output: 1451


🏁 Script executed:

# Look at the routeEvent function
rg -B 5 -A 30 "func \(h \*Hub\) routeEvent" middleware/sse/sse.go

Repository: gofiber/fiber

Length of output: 1137


🏁 Script executed:

# Look at the deliverToConn function to see how event is used
rg -B 3 -A 20 "func \(h \*Hub\) deliverToConn" middleware/sse/sse.go

Repository: gofiber/fiber

Length of output: 762


🏁 Script executed:

# Look at the test file to see if there are examples of usage patterns
head -100 middleware/sse/sse_test.go

Repository: gofiber/fiber

Length of output: 2501


🏁 Script executed:

# Check if there's any documentation in the file about caller responsibilities
rg -B 5 -A 10 "BatchDomainEvents" middleware/sse/domain_event.go

Repository: gofiber/fiber

Length of output: 885


🏁 Script executed:

# Check if there's any documentation about defensive copying in other domain event methods
rg -B 10 -A 10 "func \(h \*Hub\) DomainEvent" middleware/sse/domain_event.go

Repository: gofiber/fiber

Length of output: 835


🏁 Script executed:

# Verify the Event struct definition and confirm Data is any
rg -B 2 -A 10 "type Event struct" middleware/sse/event.go

Repository: gofiber/fiber

Length of output: 353


🏁 Script executed:

# Let me trace through one more time to confirm the timing of JSON marshaling
# Search for where marshalEvent is called to understand the timing
rg -n "marshalEvent" middleware/sse/

Repository: gofiber/fiber

Length of output: 396


🏁 Script executed:

# Verify if there's any synchronous processing that would happen before JSON marshaling
sed -n '470,530p' middleware/sse/sse.go

Repository: gofiber/fiber

Length of output: 1859


🏁 Script executed:

# Check how other similar methods handle caller-owned data
rg -B 5 -A 15 "func \(h \*Hub\) (DomainEvent|Progress|Complete)" middleware/sse/domain_event.go

Repository: gofiber/fiber

Length of output: 1850


🏁 Script executed:

# Confirm the full flow one more time - check if any methods do defensive copying of slices
rg -B 3 -A 5 "make\(\[\].*EventSpec\)" middleware/sse/

Repository: gofiber/fiber

Length of output: 39


🏁 Script executed:

# Check if there are any tests showing spec mutation patterns
rg -B 5 -A 10 "BatchDomainEvents" middleware/sse/sse_test.go

Repository: gofiber/fiber

Length of output: 1218


Copy specs before enqueueing the batch event.

BatchDomainEvents computes topics from the input slice, then stores the caller-owned specs slice directly in Event.Data. Since Publish() is non-blocking and JSON marshaling happens asynchronously in the hub's event loop, the caller can modify specs or nested Hint maps after the call returns. This causes the emitted payload to drift from the routed topics or race with serialization.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/sse/domain_event.go` around lines 117 - 129, BatchDomainEvents
currently builds topics from the caller-owned specs slice but stores that same
specs slice in Event.Data before calling Publish, which can lead to races or
payload drift because Publish is non-blocking and serialization happens later;
to fix, make a deep copy of the specs slice (and copy any mutable nested
Hint/map fields if present) before assigning it to Event.Data so the enqueued
Event holds an immutable snapshot; update the code paths around
BatchDomainEvents, Event{Data: ...}, and any use of the caller-owned specs or
Hint maps to use the copied slice/structures when calling Publish.

Comment on lines +50 to +66
func (h *Hub) FanOut(cfg FanOutConfig) context.CancelFunc { //nolint:gocritic // hugeParam: public API, value semantics preferred
ctx, cancel := context.WithCancel(context.Background())

topic := cfg.Topic
if topic == "" {
topic = cfg.Channel
}

go func() {
for {
select {
case <-ctx.Done():
return
default:
}

err := cfg.Subscriber.Subscribe(ctx, cfg.Channel, func(payload string) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

❓ Verification inconclusive

Script executed:

# First, let's locate the fanout.go file and understand its structure
find . -name "fanout.go" -type f

Repository: gofiber/fiber


Repository: gofiber/fiber
Exit code: 0

stdout:

./middleware/sse/fanout.go

Script executed:

# Get the full content of the FanOut method to see lines 50-66 in context
cat -n middleware/sse/fanout.go | head -100

Repository: gofiber/fiber


Repository: gofiber/fiber
Exit code: 0

stdout:

     1	package sse
     2	
     3	import (
     4		"context"
     5		"time"
     6	
     7		"github.com/gofiber/fiber/v3/log"
     8	)
     9	
    10	// PubSubSubscriber abstracts a pub/sub system (Redis, NATS, etc.) for
    11	// auto-fan-out from an external message broker into the SSE hub.
    12	type PubSubSubscriber interface {
    13		// Subscribe listens on the given channel and sends received messages
    14		// to the provided callback. It blocks until ctx is canceled.
    15		Subscribe(ctx context.Context, channel string, onMessage func(payload string)) error
    16	}
    17	
    18	// FanOutConfig configures auto-fan-out from an external pub/sub to the hub.
    19	type FanOutConfig struct {
    20		// Subscriber is the pub/sub implementation (Redis, NATS, etc.).
    21		Subscriber PubSubSubscriber
    22	
    23		// Transform optionally transforms the raw pub/sub message before
    24		// publishing to the hub. Return nil to skip the message.
    25		Transform func(payload string) *Event
    26	
    27		// Channel is the pub/sub channel to subscribe to.
    28		Channel string
    29	
    30		// Topic is the SSE topic to publish events to. If empty, Channel is used.
    31		Topic string
    32	
    33		// EventType is the SSE event type. Required.
    34		EventType string
    35	
    36		// CoalesceKey for PriorityCoalesced events.
    37		CoalesceKey string
    38	
    39		// TTL for events. Zero means no expiration.
    40		TTL time.Duration
    41	
    42		// Priority for delivered events. Note: PriorityInstant is 0 (the zero value),
    43		// so it is always the default if not set explicitly.
    44		Priority Priority
    45	}
    46	
    47	// FanOut starts a goroutine that subscribes to an external pub/sub channel
    48	// and automatically publishes received messages to the SSE hub.
    49	// Returns a cancel function to stop the fan-out.
    50	func (h *Hub) FanOut(cfg FanOutConfig) context.CancelFunc { //nolint:gocritic // hugeParam: public API, value semantics preferred
    51		ctx, cancel := context.WithCancel(context.Background())
    52	
    53		topic := cfg.Topic
    54		if topic == "" {
    55			topic = cfg.Channel
    56		}
    57	
    58		go func() {
    59			for {
    60				select {
    61				case <-ctx.Done():
    62					return
    63				default:
    64				}
    65	
    66				err := cfg.Subscriber.Subscribe(ctx, cfg.Channel, func(payload string) {
    67					event := h.buildFanOutEvent(&cfg, topic, payload)
    68					if event != nil {
    69						h.Publish(*event)
    70					}
    71				})
    72	
    73				if err != nil && ctx.Err() == nil {
    74					h.logFanOutError(cfg.Channel, err)
    75					select {
    76					case <-time.After(3 * time.Second):
    77					case <-ctx.Done():
    78						return
    79					}
    80				}
    81			}
    82		}()
    83	
    84		return cancel
    85	}
    86	
    87	// buildFanOutEvent creates an Event from a raw pub/sub payload.
    88	func (*Hub) buildFanOutEvent(cfg *FanOutConfig, topic, payload string) *Event {
    89		var event Event
    90	
    91		if cfg.Transform != nil {
    92			transformed := cfg.Transform(payload)
    93			if transformed == nil {
    94				return nil
    95			}
    96			event = *transformed
    97		} else {
    98			event = Event{
    99				Type: cfg.EventType,
   100				Data: payload,

Script executed:

# Check the FanOutConfig struct definition
rg "type FanOutConfig struct" -A 10 middleware/sse/

Repository: gofiber/fiber


Repository: gofiber/fiber
Exit code: 0

stdout:

middleware/sse/fanout.go:type FanOutConfig struct {
middleware/sse/fanout.go-	// Subscriber is the pub/sub implementation (Redis, NATS, etc.).
middleware/sse/fanout.go-	Subscriber PubSubSubscriber
middleware/sse/fanout.go-
middleware/sse/fanout.go-	// Transform optionally transforms the raw pub/sub message before
middleware/sse/fanout.go-	// publishing to the hub. Return nil to skip the message.
middleware/sse/fanout.go-	Transform func(payload string) *Event
middleware/sse/fanout.go-
middleware/sse/fanout.go-	// Channel is the pub/sub channel to subscribe to.
middleware/sse/fanout.go-	Channel string
middleware/sse/fanout.go-

Script executed:

# Check if there's any validation of cfg.Subscriber before Subscribe call
rg "cfg\.Subscriber" middleware/sse/fanout.go -B 2 -A 2

Repository: gofiber/fiber


Repository: gofiber/fiber
Exit code: 0

stdout:

			}

			err := cfg.Subscriber.Subscribe(ctx, cfg.Channel, func(payload string) {
				event := h.buildFanOutEvent(&cfg, topic, payload)
				if event != nil {

Add validation to reject fan-out configs without a usable Subscriber.

The FanOut method spawns a goroutine at line 58 and immediately calls cfg.Subscriber.Subscribe(ctx, ...) at line 66 without checking if Subscriber is nil. A misconfigured FanOutConfig will cause a panic in the background goroutine instead of failing immediately. Validate Subscriber before spawning the goroutine so the error can be returned to the caller.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/sse/fanout.go` around lines 50 - 66, Validate cfg.Subscriber
before spawning the goroutine in Hub.FanOut: if cfg.Subscriber is nil, return a
no-op cancel function (or cancel the context immediately) and log/handle the
error so the caller fails fast instead of letting the background goroutine
panic; specifically check cfg.Subscriber prior to creating ctx/cancel or
starting the go func, and avoid calling cfg.Subscriber.Subscribe(...) when
cfg.Subscriber is nil (the Subscribe call is the unsafe call to guard).

@gaby gaby changed the title feat: Add SSE (Server-Sent Events) middleware 🔥 feat: Add SSE (Server-Sent Events) middleware Apr 7, 2026
@gaby gaby moved this to In Progress in v3 Apr 7, 2026
@codecov
Copy link
Copy Markdown

codecov bot commented Apr 7, 2026

Codecov Report

❌ Patch coverage is 88.57965% with 119 lines in your changes missing coverage. Please review.
✅ Project coverage is 91.02%. Comparing base (4ea5bef) to head (98903a1).

Files with missing lines Patch % Lines
middleware/sse/sse.go 83.48% 34 Missing and 20 partials ⚠️
middleware/sse/auth.go 79.76% 12 Missing and 5 partials ⚠️
middleware/sse/event.go 77.77% 8 Missing and 8 partials ⚠️
middleware/sse/connection.go 81.25% 8 Missing and 4 partials ⚠️
middleware/sse/stats.go 76.00% 4 Missing and 2 partials ⚠️
middleware/sse/metrics.go 95.95% 2 Missing and 2 partials ⚠️
middleware/sse/replayer.go 92.98% 2 Missing and 2 partials ⚠️
middleware/sse/throttle.go 93.18% 2 Missing and 1 partial ⚠️
middleware/sse/topic.go 91.30% 1 Missing and 1 partial ⚠️
middleware/sse/fanout.go 98.07% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #4196      +/-   ##
==========================================
- Coverage   91.18%   91.02%   -0.17%     
==========================================
  Files         123      137      +14     
  Lines       11847    12889    +1042     
==========================================
+ Hits        10803    11732     +929     
- Misses        658      727      +69     
- Partials      386      430      +44     
Flag Coverage Δ
unittests 91.02% <88.57%> (-0.17%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@vinod-morya
Copy link
Copy Markdown
Author

Addressed all remaining CodeRabbit feedback + coverage boost in 98903a1:

Security: Sanitize MarshaledEvent.ID and Type — strip \r\n to prevent SSE framing injection.

Type safety: Replaced chan any with typed chan MarshaledEvent + separate heartbeat chan struct{}. Compile-time safe, no more type switches.

Correctness:

  • Metadata frozen (defensive copy) after OnConnect to prevent concurrent mutation
  • FanOut Transform path now fully respects all fields including PriorityInstant
  • Removed redundant Content-Type from MetricsHandler

Coverage: 69% → 91% — Added 40+ tests. Codecov will update on this commit.

@gaby
Copy link
Copy Markdown
Member

gaby commented Apr 7, 2026

@vinod-morya Can you tell claude to document the middleware using the same format as the other middlewares.

  • remove the readme.md
  • update whats_new.md
  • fix modernize lint ci issues

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (6)
middleware/sse/fanout.go (1)

50-66: ⚠️ Potential issue | 🟠 Major

Fail fast when Subscriber is nil.

Line 66 dereferences cfg.Subscriber inside the background goroutine, so a misconfiguration turns into an asynchronous panic instead of a deterministic failure at the call site. Validate it before spawning the goroutine and panic there if the config is invalid. Based on learnings: In the gofiber/fiber framework, service startup failures should panic rather than allowing the application to continue running with degraded functionality, as this is the agreed-upon design decision.

💡 Minimal fix
 func (h *Hub) FanOut(cfg FanOutConfig) context.CancelFunc { //nolint:gocritic // hugeParam: public API, value semantics preferred
+	if cfg.Subscriber == nil {
+		panic("sse: fan-out subscriber is nil")
+	}
+
 	ctx, cancel := context.WithCancel(context.Background())
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/sse/fanout.go` around lines 50 - 66, The FanOut method currently
dereferences cfg.Subscriber inside the background goroutine, risking an
asynchronous panic; before spawning the goroutine in Hub.FanOut, validate that
cfg.Subscriber != nil (and any other required cfg fields like cfg.Channel/Topic
if desired) and call panic with a clear message if it's nil so the failure is
deterministic at startup; locate the FanOut function and the use of
cfg.Subscriber.Subscribe to add this pre-check and panic early.
middleware/sse/connection.go (1)

125-128: ⚠️ Potential issue | 🔴 Critical

Require group keys to exist before matching.

Line 127 treats an absent metadata key as "", so a selector like map[string]string{"tenant_id": ""} also matches connections that do not have tenant_id at all. That weakens group scoping.

💡 Minimal fix
 func connMatchesGroup(conn *Connection, group map[string]string) bool {
 	for k, v := range group {
-		if conn.Metadata[k] != v {
+		got, ok := conn.Metadata[k]
+		if !ok || got != v {
 			return false
 		}
 	}
 	return true
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/sse/connection.go` around lines 125 - 128, The connMatchesGroup
function currently treats missing metadata keys as empty strings which lets
selectors like {"tenant_id": ""} match connections without that key; update
connMatchesGroup to first check for key presence in conn.Metadata (e.g., use the
map lookup pattern to get value and ok) and return false if the key is missing,
then compare the value to v and return false on mismatch, otherwise continue;
this ensures group keys must exist before matching.
middleware/sse/event.go (1)

116-133: ⚠️ Potential issue | 🟠 Major

Sanitize MarshaledEvent.ID and Type in WriteTo() too.

marshalEvent() strips CR/LF, but MarshaledEvent is exported and external replayers can return arbitrary values. Lines 119-128 currently serialize ID and Type verbatim, so a replayed or manually constructed event can still inject extra SSE fields/events.

💡 Minimal fix
 func (me *MarshaledEvent) WriteTo(w io.Writer) (int64, error) {
 	var total int64
+	id := sanitizeSSEField(me.ID)
+	eventType := sanitizeSSEField(me.Type)
 
-	if me.ID != "" {
-		n, err := fmt.Fprintf(w, "id: %s\n", me.ID)
+	if id != "" {
+		n, err := fmt.Fprintf(w, "id: %s\n", id)
 		total += int64(n)
 		if err != nil {
 			return total, fmt.Errorf("sse: write id: %w", err)
 		}
 	}
 
-	if me.Type != "" {
-		n, err := fmt.Fprintf(w, "event: %s\n", me.Type)
+	if eventType != "" {
+		n, err := fmt.Fprintf(w, "event: %s\n", eventType)
 		total += int64(n)
 		if err != nil {
 			return total, fmt.Errorf("sse: write event: %w", err)
 		}
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/sse/event.go` around lines 116 - 133, The WriteTo method on
MarshaledEvent currently writes MarshaledEvent.ID and MarshaledEvent.Type
verbatim which allows CR/LF injection; update MarshaledEvent.WriteTo to sanitize
ID and Type the same way marshalEvent() does (strip CR and LF or otherwise
reject/escape newline characters) before calling fmt.Fprintf, e.g., normalize or
call the existing sanitizer used by marshalEvent to produce safe id/type
strings, and then write those sanitized values to w so external replayers cannot
inject extra SSE fields via ID or Type.
middleware/sse/sse_test.go (2)

397-401: ⚠️ Potential issue | 🟠 Major

Replace the fixed sleep with actual synchronization.

This 50ms pause is racing the scheduler instead of the condition under test. The same pattern repeats later in the file (for example Line 460, Line 483, Line 797, Line 1475, and Line 1983). For the stats-only cases you can assert immediately because Publish increments the counter synchronously; for delivery/replay checks, wait on a channel or use require.Eventually. Based on learnings: In the Test_App_ShutdownWithContext function in app_test.go, the clientDone channel is used to synchronize the client's request completion before proceeding, eliminating the need for additional time.Sleep calls.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/sse/sse_test.go` around lines 397 - 401, The fixed 50ms sleep
racing the scheduler should be removed: for the stats-only assertion after
hub.Publish (see hub.Publish and hub.Stats in the test) assert immediately
because Publish increments EventsPublished synchronously; for tests that verify
delivery/replay replace time.Sleep with proper synchronization—either wait on a
clientDone-style channel (see Test_App_ShutdownWithContext’s clientDone pattern)
that the test client closes when it finishes, or use require.Eventually to poll
the expected condition; update other occurrences (the other sleeps in this test
file) to follow the same approach so tests wait on the real condition instead of
sleeping.

658-669: ⚠️ Potential issue | 🟠 Major

Don't use request timeout as the success signal.

A handler that hangs before writing headers satisfies the same expectation as a healthy long-lived SSE stream here. Add an explicit server-side signal first—e.g. an OnConnect channel or require.Eventually on hub.Stats().ActiveConnections == 1—and only then allow the client timeout.

Also applies to: 695-704

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/sse/sse_test.go` around lines 658 - 669, The test currently treats
a client timeout as success, which can be a false positive if the handler never
wrote headers; change the test to first wait for a server-side signal that the
SSE connection was accepted (for example by waiting with require.Eventually(()
-> hub.Stats().ActiveConnections == 1) or by using an OnConnect channel exposed
by your SSE hub) and only after that perform the client request/Test with a
short timeout to assert the stream stays open; apply the same change to the
duplicate case around the 695-704 block so both tests assert
hub.Stats().ActiveConnections (or OnConnect) before relying on the client
timeout.
middleware/sse/sse.go (1)

153-176: ⚠️ Potential issue | 🔴 Critical

Wait for run() to acknowledge registration.

Line 173 only confirms the request was queued. Until run() actually processes it, live events can still be missed; if the stream exits first, the deferred unregister at Lines 154-159 can run ahead of register and the later addConnection leaves a closed connection back in connections/topicIndex. This still needs an acked register request plus a registered flag for cleanup.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/sse/sse.go` around lines 153 - 176, The register send currently
only queues the request (hub.register <- conn) so run() may process unregister
first; change the registration protocol to send a registration request object
that includes the conn and a reply/ack channel, e.g., send {conn, ackChan}, then
wait for the ack (select on ackChan and hub.shutdown) before proceeding; add a
local registered bool and set it only after ack returns true, and update the
defer to only send to hub.unregister when registered is true (and still guard
with hub.shutdown), so run()/addConnection never stores a closed connection and
cleanup is consistent.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@middleware/sse/sse.go`:
- Around line 316-321: The synthetic "connected" MarshaledEvent should not
include an ID because it isn't stored in the replayer and will cause Replay to
anchor on an unknown ID; remove the ID assignment (do not call nextEventID())
when constructing the connected event in the connected variable (or set ID to an
empty value) so the emitted handshake has no id field, leaving replayable events
to drive client anchoring.
- Around line 286-305: The replay path in Hub.replayEvents currently writes
every returned MarshaledEvent without checking per-event expiry; before calling
me.WriteTo(w) skip any event whose CreatedAt+TTL is already in the past (i.e.,
stale) so short-lived events that expired while the client was disconnected are
not replayed. In practice, inside replayEvents (after obtaining events and
before WriteTo) compute expiry := me.CreatedAt.Add(time.Duration(me.TTL) *
time.Second) (or equivalent based on TTL units), compare with time.Now(), and
continue (skip) if expiry.Before(now); leave events with zero/zero-value TTL as
non-expiring if that’s the convention. Ensure you reference
MarshaledEvent.CreatedAt, MarshaledEvent.TTL and the existing WriteTo call when
making the change.
- Around line 354-368: The shutdown logic currently races between
watchShutdown() and run() causing writeLoop to exit before the shutdown event
flushes; change the sequencing so the hub's run() is the single coordinator: in
run()'s shutdown branch (the code that currently calls conn.Close() at the
shutdown path) first send the MarshaledEvent "server-shutdown" to each
Connection using conn.trySend (or enqueue it via the same path the hub uses for
normal events), then sleep shutdownDrainDelay, and only after that call
conn.Close(); remove or neutralize the conn.Close() call inside watchShutdown()
so it no longer races with the hub; ensure watchShutdown() only triggers the
send request or unsubscribes rather than closing the connection itself.

---

Duplicate comments:
In `@middleware/sse/connection.go`:
- Around line 125-128: The connMatchesGroup function currently treats missing
metadata keys as empty strings which lets selectors like {"tenant_id": ""} match
connections without that key; update connMatchesGroup to first check for key
presence in conn.Metadata (e.g., use the map lookup pattern to get value and ok)
and return false if the key is missing, then compare the value to v and return
false on mismatch, otherwise continue; this ensures group keys must exist before
matching.

In `@middleware/sse/event.go`:
- Around line 116-133: The WriteTo method on MarshaledEvent currently writes
MarshaledEvent.ID and MarshaledEvent.Type verbatim which allows CR/LF injection;
update MarshaledEvent.WriteTo to sanitize ID and Type the same way
marshalEvent() does (strip CR and LF or otherwise reject/escape newline
characters) before calling fmt.Fprintf, e.g., normalize or call the existing
sanitizer used by marshalEvent to produce safe id/type strings, and then write
those sanitized values to w so external replayers cannot inject extra SSE fields
via ID or Type.

In `@middleware/sse/fanout.go`:
- Around line 50-66: The FanOut method currently dereferences cfg.Subscriber
inside the background goroutine, risking an asynchronous panic; before spawning
the goroutine in Hub.FanOut, validate that cfg.Subscriber != nil (and any other
required cfg fields like cfg.Channel/Topic if desired) and call panic with a
clear message if it's nil so the failure is deterministic at startup; locate the
FanOut function and the use of cfg.Subscriber.Subscribe to add this pre-check
and panic early.

In `@middleware/sse/sse_test.go`:
- Around line 397-401: The fixed 50ms sleep racing the scheduler should be
removed: for the stats-only assertion after hub.Publish (see hub.Publish and
hub.Stats in the test) assert immediately because Publish increments
EventsPublished synchronously; for tests that verify delivery/replay replace
time.Sleep with proper synchronization—either wait on a clientDone-style channel
(see Test_App_ShutdownWithContext’s clientDone pattern) that the test client
closes when it finishes, or use require.Eventually to poll the expected
condition; update other occurrences (the other sleeps in this test file) to
follow the same approach so tests wait on the real condition instead of
sleeping.
- Around line 658-669: The test currently treats a client timeout as success,
which can be a false positive if the handler never wrote headers; change the
test to first wait for a server-side signal that the SSE connection was accepted
(for example by waiting with require.Eventually(() ->
hub.Stats().ActiveConnections == 1) or by using an OnConnect channel exposed by
your SSE hub) and only after that perform the client request/Test with a short
timeout to assert the stream stays open; apply the same change to the duplicate
case around the 695-704 block so both tests assert hub.Stats().ActiveConnections
(or OnConnect) before relying on the client timeout.

In `@middleware/sse/sse.go`:
- Around line 153-176: The register send currently only queues the request
(hub.register <- conn) so run() may process unregister first; change the
registration protocol to send a registration request object that includes the
conn and a reply/ack channel, e.g., send {conn, ackChan}, then wait for the ack
(select on ackChan and hub.shutdown) before proceeding; add a local registered
bool and set it only after ack returns true, and update the defer to only send
to hub.unregister when registered is true (and still guard with hub.shutdown),
so run()/addConnection never stores a closed connection and cleanup is
consistent.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: c82d9988-7c1a-4eb2-8838-dd8ae3d94601

📥 Commits

Reviewing files that changed from the base of the PR and between 3f2386e and 98903a1.

📒 Files selected for processing (6)
  • middleware/sse/connection.go
  • middleware/sse/event.go
  • middleware/sse/fanout.go
  • middleware/sse/metrics.go
  • middleware/sse/sse.go
  • middleware/sse/sse_test.go

Comment on lines +316 to +321
connected := MarshaledEvent{
ID: nextEventID(),
Type: "connected",
Data: fmt.Sprintf(`{"connection_id":%q,"topics":%s}`, conn.ID, string(topicsJSON)),
Retry: -1,
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't give the synthetic connected event an id:.

This event is written directly and never stored in the replayer. If the client reconnects before receiving a real replayable event, it will anchor on this unknown ID and Replay returns nil, so events published after the handshake are skipped. Emit connected without an id: or persist it too.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/sse/sse.go` around lines 316 - 321, The synthetic "connected"
MarshaledEvent should not include an ID because it isn't stored in the replayer
and will cause Replay to anchor on an unknown ID; remove the ID assignment (do
not call nextEventID()) when constructing the connected event in the connected
variable (or set ID to an empty value) so the emitted handshake has no id field,
leaving replayable events to drive client anchoring.

Comment on lines +354 to +368
func (h *Hub) watchShutdown(conn *Connection) {
go func() {
select {
case <-h.shutdown:
if !conn.IsClosed() {
shutdownEvt := MarshaledEvent{
ID: nextEventID(),
Type: "server-shutdown",
Data: "{}",
Retry: -1,
}
conn.trySend(shutdownEvt)
time.Sleep(shutdownDrainDelay)
}
conn.Close()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

server-shutdown is still best-effort, not guaranteed.

Closing h.shutdown wakes both watchShutdown() and the shutdown branch in run(). run() immediately calls conn.Close() at Lines 408-411, so many writeLoops will exit before the queued shutdown event is flushed. If this event is part of the contract, sequence it from the hub loop first and only close connections after shutdownDrainDelay.

Also applies to: 407-413

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@middleware/sse/sse.go` around lines 354 - 368, The shutdown logic currently
races between watchShutdown() and run() causing writeLoop to exit before the
shutdown event flushes; change the sequencing so the hub's run() is the single
coordinator: in run()'s shutdown branch (the code that currently calls
conn.Close() at the shutdown path) first send the MarshaledEvent
"server-shutdown" to each Connection using conn.trySend (or enqueue it via the
same path the hub uses for normal events), then sleep shutdownDrainDelay, and
only after that call conn.Close(); remove or neutralize the conn.Close() call
inside watchShutdown() so it no longer races with the hub; ensure
watchShutdown() only triggers the send request or unsubscribes rather than
closing the connection itself.

Add production-grade Server-Sent Events middleware built natively for
Fiber's fasthttp architecture with proper client disconnect detection.

Features: Hub-based broker, 3 priority lanes, NATS-style topic wildcards,
adaptive throttling, connection groups, JWT/ticket auth, cache invalidation
helpers, Prometheus metrics, Last-Event-ID replay, Redis/NATS fan-out,
and graceful Kubernetes-style drain.

91% test coverage, golangci-lint clean, go test -race clean.

Resolves gofiber#4194
@vinod-morya vinod-morya force-pushed the feat/middleware-sse branch from 98903a1 to 1ca6cb3 Compare April 7, 2026 16:53
@vinod-morya
Copy link
Copy Markdown
Author

Thanks @gaby! All three addressed:

  • Removed README.md from middleware folder
  • Added docs/middleware/sse.md following the same format as other middlewares
  • Updated docs/whats_new.md with SSE section
  • Fixed all modernize lint issues (maps.Copy, min/max builtins, range over int, strings.SplitSeq)
  • Also added nil-guard panic in FanOut() per CodeRabbit

One clean commit, 91% coverage. Let me know if anything else needs adjusting! 🤙

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

Status: In Progress

Development

Successfully merging this pull request may close these issues.

3 participants