-
-
Notifications
You must be signed in to change notification settings - Fork 2k
π₯ feat: Add SSE (Server-Sent Events) middleware #4196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
vinod-morya
wants to merge
1
commit into
gofiber:main
Choose a base branch
from
vinod-morya:feat/middleware-sse
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| --- | ||
| id: sse | ||
| --- | ||
|
|
||
| # SSE | ||
|
|
||
| Server-Sent Events middleware for [Fiber](https://github.com/gofiber/fiber) that provides a production-grade SSE broker built natively on Fiber's fasthttp architecture. It includes a Hub-based event broker with topic routing, event coalescing (last-writer-wins), three priority lanes (instant/batched/coalesced), NATS-style topic wildcards, adaptive per-connection throttling, connection groups, built-in JWT and ticket auth helpers, Prometheus metrics, graceful Kubernetes-style drain, auto fan-out from Redis/NATS, and pluggable Last-Event-ID replay. | ||
|
|
||
| ## Signatures | ||
|
|
||
| ```go | ||
| func New(config ...Config) fiber.Handler | ||
| func NewWithHub(config ...Config) (fiber.Handler, *Hub) | ||
| ``` | ||
|
|
||
| ## Examples | ||
|
|
||
| Import the middleware package: | ||
|
|
||
| ```go | ||
| import ( | ||
| "github.com/gofiber/fiber/v3" | ||
| "github.com/gofiber/fiber/v3/middleware/sse" | ||
| ) | ||
| ``` | ||
|
|
||
| Once your Fiber app is initialized, create an SSE handler and hub: | ||
|
|
||
| ```go | ||
| // Basic usage β subscribe all clients to "notifications" | ||
| handler, hub := sse.NewWithHub(sse.Config{ | ||
| OnConnect: func(c fiber.Ctx, conn *sse.Connection) error { | ||
| conn.Topics = []string{"notifications"} | ||
| return nil | ||
| }, | ||
| }) | ||
| app.Get("/events", handler) | ||
|
|
||
| // Publish an event from any goroutine | ||
| hub.Publish(sse.Event{ | ||
| Type: "update", | ||
| Data: "hello", | ||
| Topics: []string{"notifications"}, | ||
| }) | ||
| ``` | ||
|
|
||
| Use JWT authentication and metadata-based groups for multi-tenant isolation: | ||
|
|
||
| ```go | ||
| handler, hub := sse.NewWithHub(sse.Config{ | ||
| OnConnect: sse.JWTAuth(func(token string) (map[string]string, error) { | ||
| claims, err := validateJWT(token) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return map[string]string{ | ||
| "user_id": claims.UserID, | ||
| "tenant_id": claims.TenantID, | ||
| }, nil | ||
| }), | ||
| }) | ||
| app.Get("/events", handler) | ||
|
|
||
| // Publish only to a specific tenant | ||
| hub.DomainEvent("orders", "created", orderID, tenantID, nil) | ||
| ``` | ||
|
|
||
| Use event coalescing to reduce traffic for high-frequency updates: | ||
|
|
||
| ```go | ||
| // Progress events use PriorityCoalesced β if progress goes 5%β8% | ||
| // in one flush window, only 8% is sent to the client. | ||
| hub.Progress("import", importID, tenantID, current, total, nil) | ||
|
|
||
| // Completion events use PriorityInstant β always delivered immediately. | ||
| hub.Complete("import", importID, tenantID, true, map[string]any{ | ||
| "rows_imported": 1500, | ||
| }) | ||
| ``` | ||
|
|
||
| Use fan-out to bridge an external pub/sub system into the SSE hub: | ||
|
|
||
| ```go | ||
| cancel := hub.FanOut(sse.FanOutConfig{ | ||
| Subscriber: redisSubscriber, | ||
| Channel: "events:orders", | ||
| EventType: "order-update", | ||
| Topic: "orders", | ||
| }) | ||
| defer cancel() | ||
| ``` | ||
|
|
||
| ## Config | ||
|
|
||
| | Property | Type | Description | Default | | ||
| | :---------------- | :------------------------------------------------ | :------------------------------------------------------------------------------------------------------------------- | :------------- | | ||
| | Next | `func(fiber.Ctx) bool` | Next defines a function to skip this middleware when returned true. | `nil` | | ||
| | OnConnect | `func(fiber.Ctx, *Connection) error` | Called when a new client connects. Set `conn.Topics` and `conn.Metadata` here. Return error to reject (sends 403). | `nil` | | ||
| | OnDisconnect | `func(*Connection)` | Called after a client disconnects. | `nil` | | ||
| | OnPause | `func(*Connection)` | Called when a connection is paused (browser tab hidden). | `nil` | | ||
| | OnResume | `func(*Connection)` | Called when a connection is resumed (browser tab visible). | `nil` | | ||
| | Replayer | `Replayer` | Enables Last-Event-ID replay. If nil, replay is disabled. | `nil` | | ||
| | FlushInterval | `time.Duration` | How often batched (P1) and coalesced (P2) events are flushed to clients. Instant (P0) events bypass this. | `2s` | | ||
| | HeartbeatInterval | `time.Duration` | How often a comment is sent to idle connections to detect disconnects and prevent proxy timeouts. | `30s` | | ||
| | MaxLifetime | `time.Duration` | Maximum duration a single SSE connection can stay open. Set to -1 for unlimited. | `30m` | | ||
| | SendBufferSize | `int` | Per-connection channel buffer. If full, events are dropped. | `256` | | ||
| | RetryMS | `int` | Reconnection interval hint sent to clients via the `retry:` directive on connect. | `3000` | | ||
|
|
||
| ## Default Config | ||
|
|
||
| ```go | ||
| var ConfigDefault = Config{ | ||
| FlushInterval: 2 * time.Second, | ||
| SendBufferSize: 256, | ||
| HeartbeatInterval: 30 * time.Second, | ||
| MaxLifetime: 30 * time.Minute, | ||
| RetryMS: 3000, | ||
| } | ||
| ``` |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,187 @@ | ||
| package sse | ||
|
|
||
| import ( | ||
| "crypto/rand" | ||
| "encoding/hex" | ||
| "errors" | ||
| "fmt" | ||
| "maps" | ||
| "runtime" | ||
| "strings" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/gofiber/fiber/v3" | ||
| ) | ||
|
|
||
| // JWTAuth returns an OnConnect handler that validates a JWT Bearer token | ||
| // from the Authorization header or a token query parameter. | ||
| // | ||
| // The validateFunc receives the raw token string and should return the | ||
| // claims as a map. Return an error to reject the connection. | ||
| func JWTAuth(validateFunc func(token string) (map[string]string, error)) func(fiber.Ctx, *Connection) error { | ||
| return func(c fiber.Ctx, conn *Connection) error { | ||
| token := "" | ||
|
|
||
| const bearerPrefix = "Bearer " | ||
| auth := c.Get("Authorization") | ||
| if len(auth) > len(bearerPrefix) && strings.EqualFold(auth[:len(bearerPrefix)], bearerPrefix) { | ||
| token = auth[len(bearerPrefix):] | ||
| } | ||
|
|
||
| if token == "" { | ||
| token = c.Query("token") | ||
| } | ||
|
|
||
| if token == "" { | ||
| return errors.New("missing authentication token") | ||
| } | ||
|
|
||
| claims, err := validateFunc(token) | ||
| if err != nil { | ||
| return fmt.Errorf("authentication failed: %w", err) | ||
| } | ||
|
|
||
| maps.Copy(conn.Metadata, claims) | ||
|
|
||
| return nil | ||
| } | ||
| } | ||
|
|
||
| // TicketStore is the interface for ticket-based SSE authentication. | ||
| // Implement this with Redis, in-memory, or any key-value store. | ||
| type TicketStore interface { | ||
| // Set stores a ticket with the given value and TTL. | ||
| Set(ticket, value string, ttl time.Duration) error | ||
|
|
||
| // GetDel atomically retrieves and deletes a ticket (one-time use). | ||
| // Returns empty string and nil error if not found. | ||
| GetDel(ticket string) (string, error) | ||
| } | ||
|
|
||
| // MemoryTicketStore is an in-memory TicketStore for development and testing. | ||
| // Call Close to stop the background cleanup goroutine. | ||
| type MemoryTicketStore struct { | ||
| tickets map[string]memTicket | ||
| done chan struct{} | ||
| mu sync.Mutex | ||
| closeOnce sync.Once | ||
| } | ||
|
|
||
| type memTicket struct { | ||
| expires time.Time | ||
| value string | ||
| } | ||
|
|
||
| // NewMemoryTicketStore creates an in-memory ticket store with a background | ||
| // cleanup goroutine that evicts expired tickets every 30 seconds. | ||
| func NewMemoryTicketStore() *MemoryTicketStore { | ||
| s := &MemoryTicketStore{ | ||
| tickets: make(map[string]memTicket), | ||
| done: make(chan struct{}), | ||
| } | ||
| go func() { | ||
| ticker := time.NewTicker(30 * time.Second) | ||
| defer ticker.Stop() | ||
| for { | ||
| select { | ||
| case <-ticker.C: | ||
| s.mu.Lock() | ||
| now := time.Now() | ||
| for k, v := range s.tickets { | ||
| if now.After(v.expires) { | ||
| delete(s.tickets, k) | ||
| } | ||
| } | ||
| s.mu.Unlock() | ||
| case <-s.done: | ||
| return | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| // Prevent goroutine leak if caller forgets to call Close. | ||
| runtime.SetFinalizer(s, func(s *MemoryTicketStore) { | ||
| s.Close() | ||
| }) | ||
|
|
||
| return s | ||
| } | ||
|
|
||
| // Close stops the background cleanup goroutine. Safe to call multiple times. | ||
| func (s *MemoryTicketStore) Close() { | ||
| s.closeOnce.Do(func() { | ||
| close(s.done) | ||
| }) | ||
| } | ||
|
|
||
| // Set stores a ticket with the given value and TTL. | ||
| func (s *MemoryTicketStore) Set(ticket, value string, ttl time.Duration) error { | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
| s.tickets[ticket] = memTicket{value: value, expires: time.Now().Add(ttl)} | ||
| return nil | ||
| } | ||
|
|
||
| // GetDel atomically retrieves and deletes a ticket (one-time use). | ||
| func (s *MemoryTicketStore) GetDel(ticket string) (string, error) { | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
| t, ok := s.tickets[ticket] | ||
| if !ok { | ||
| return "", nil | ||
| } | ||
| delete(s.tickets, ticket) | ||
| if time.Now().After(t.expires) { | ||
| return "", nil | ||
| } | ||
| return t.value, nil | ||
| } | ||
|
|
||
| // TicketAuth returns an OnConnect handler that validates a one-time ticket | ||
| // from the ticket query parameter. | ||
| func TicketAuth( | ||
| store TicketStore, | ||
| parseValue func(value string) (metadata map[string]string, topics []string, err error), | ||
| ) func(fiber.Ctx, *Connection) error { | ||
| return func(c fiber.Ctx, conn *Connection) error { | ||
| ticket := c.Query("ticket") | ||
| if ticket == "" { | ||
| return errors.New("missing ticket parameter") | ||
| } | ||
|
|
||
| value, err := store.GetDel(ticket) | ||
| if err != nil { | ||
| return fmt.Errorf("ticket validation error: %w", err) | ||
| } | ||
| if value == "" { | ||
| return errors.New("invalid or expired ticket") | ||
| } | ||
|
|
||
| metadata, topics, err := parseValue(value) | ||
| if err != nil { | ||
| return fmt.Errorf("ticket parse error: %w", err) | ||
| } | ||
|
|
||
| maps.Copy(conn.Metadata, metadata) | ||
| if len(topics) > 0 { | ||
| conn.Topics = topics | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
| } | ||
|
|
||
| // IssueTicket creates a one-time ticket and stores it. Returns the | ||
| // ticket string that the client should pass as ?ticket=<value>. | ||
| func IssueTicket(store TicketStore, value string, ttl time.Duration) (string, error) { | ||
| b := make([]byte, 24) | ||
| if _, err := rand.Read(b); err != nil { | ||
| return "", fmt.Errorf("failed to generate ticket: %w", err) | ||
| } | ||
| ticket := hex.EncodeToString(b) | ||
| if err := store.Set(ticket, value, ttl); err != nil { | ||
| return "", err | ||
| } | ||
| return ticket, nil | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.