Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,60 @@ forward "/graphiql",

See the API documentation for `Absinthe.Plug.GraphiQL` for more information.

## Incremental Delivery

Absinthe.Plug supports GraphQL `@defer` and `@stream` directives for incremental delivery over HTTP using Server-Sent Events (SSE). This enables real-time streaming of deferred fragments and list items while maintaining HTTP compatibility.

Key features:
- ✅ **Server-Sent Events**: Standards-compliant SSE implementation
- ✅ **HTTP/2 Compatible**: Efficient multiplexing support
- ✅ **CORS Support**: Cross-origin streaming capabilities
- ✅ **Graceful Fallback**: Automatic fallback to standard GraphQL responses

**Installation with incremental delivery:**

```elixir
def deps do
[
{:absinthe, git: "https://github.com/gigsmart/absinthe.git", branch: "gigmart/defer-stream-incremental"},
{:absinthe_plug, git: "https://github.com/gigsmart/absinthe_plug.git", branch: "gigmart/defer-stream-incremental"},
{:plug, "~> 1.12"},
{:jason, "~> 1.2"}
]
end
```

**Example usage:**

```javascript
// Client-side SSE connection
const eventSource = new EventSource('/api/graphql/stream?' + new URLSearchParams({
query: `
query GetPosts {
posts @stream(initialCount: 3, label: "posts") {
id
title
... @defer(label: "content") {
content
}
}
}
`
}));

eventSource.addEventListener('initial', (event) => {
const data = JSON.parse(event.data);
console.log('Initial data:', data);
});

eventSource.addEventListener('incremental', (event) => {
const increment = JSON.parse(event.data);
console.log('Incremental data:', increment);
});
```

For comprehensive documentation on HTTP incremental delivery patterns, see [Absinthe Incremental Delivery Guide](https://hexdocs.pm/absinthe/incremental-delivery.html).

## Community

The project is under constant improvement by a growing list of
Expand Down
7 changes: 4 additions & 3 deletions lib/absinthe/plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -364,11 +364,12 @@ defmodule Absinthe.Plug do
end

def subscribe(conn, topic, %{context: %{pubsub: pubsub}} = config) do
alias Absinthe.Plug.Incremental.SSE.ConnectionManager

pubsub.subscribe(topic)

conn
|> put_resp_header("content-type", "text/event-stream")
|> send_chunked(200)
|> ConnectionManager.setup_sse_headers()
|> subscribe_loop(topic, config)
end

Expand All @@ -389,7 +390,7 @@ defmodule Absinthe.Plug do
conn
after
30_000 ->
case chunk(conn, ":ping\n\n") do
case chunk(conn, ": keep-alive\n\n") do
{:ok, conn} ->
subscribe_loop(conn, topic, config)

Expand Down
156 changes: 156 additions & 0 deletions lib/absinthe/plug/incremental/sse.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
defmodule Absinthe.Plug.Incremental.SSE do
@moduledoc """
Server-Sent Events (SSE) transport for incremental delivery.

This module implements incremental delivery over HTTP using SSE,
allowing @defer and @stream directives to work over standard HTTP connections.

## Usage

# In your Phoenix router
pipeline :graphql_streaming do
plug Absinthe.Plug.Incremental.SSE
end

scope "/api" do
pipe_through :graphql_streaming

post "/graphql/stream", GraphQLController, :stream_query
end

## Example Query with Streaming

query GetUsers {
users @stream(initialCount: 2, label: "users") {
id
name
... @defer(label: "profile") {
profile {
bio
avatar
}
}
}
}

The response will be delivered as a series of SSE events:
- `initial` event with the first 2 users
- `incremental` events for remaining users and deferred profiles
- `complete` event when all data is delivered
"""

use Absinthe.Incremental.Transport
import Plug.Conn

require Logger

alias Absinthe.Plug.Incremental.SSE.{EventFormatter, ConnectionManager, QueryProcessor}

@impl true
def init(conn, options) do
if ConnectionManager.accepts_sse?(conn) do
conn = ConnectionManager.setup_sse_headers(conn)

if Keyword.get(options, :keep_alive, true) do
ConnectionManager.schedule_keep_alive()
end

{:ok, %{
conn: conn,
operation_id: Keyword.get(options, :operation_id),
event_id: 0,
options: options
}}
else
{:error, :sse_not_accepted}
end
end

@impl true
def send_initial(state, response) do
event_data = EventFormatter.format_event("initial", response, state.event_id)

case chunk(state.conn, event_data) do
{:ok, conn} ->
{:ok, %{state |
conn: conn,
event_id: state.event_id + 1
}}

{:error, reason} ->
Logger.error("Failed to send initial SSE response: #{inspect(reason)}")
{:error, {:transport_error, reason}}
end
end

@impl true
def send_incremental(state, response) do
event_data = EventFormatter.format_event("incremental", response, state.event_id)

case chunk(state.conn, event_data) do
{:ok, conn} ->
{:ok, %{state |
conn: conn,
event_id: state.event_id + 1
}}

{:error, reason} ->
Logger.error("Failed to send incremental SSE response: #{inspect(reason)}")
{:error, {:transport_error, reason}}
end
end

@impl true
def complete(state) do
event_data = EventFormatter.format_event("complete", %{}, state.event_id)

case chunk(state.conn, event_data) do
{:ok, conn} ->
chunk(conn, "")
:ok

{:error, reason} ->
Logger.error("Failed to send complete SSE event: #{inspect(reason)}")
{:error, {:transport_error, reason}}
end
end

@impl true
def handle_error(state, error) do
error_response = EventFormatter.format_error_response(error)
event_data = EventFormatter.format_event("error", error_response, state.event_id)

case chunk(state.conn, event_data) do
{:ok, conn} ->
{:ok, %{state |
conn: conn,
event_id: state.event_id + 1
}}

{:error, reason} ->
Logger.error("Failed to send error SSE event: #{inspect(reason)}")
{:error, {:transport_error, reason}}
end
end

@doc """
Handle keep-alive to prevent connection timeout.
"""
def handle_keep_alive(state) do
case chunk(state.conn, ": keep-alive\n\n") do
{:ok, conn} ->
ConnectionManager.schedule_keep_alive()
{:ok, %{state | conn: conn}}

{:error, _reason} ->
{:error, :connection_closed}
end
end

@doc """
Process a GraphQL query with incremental delivery over SSE.
"""
def process_query(conn, schema, query, variables \\ %{}, options \\ []) do
QueryProcessor.process(conn, schema, query, variables, options)
end
end
51 changes: 51 additions & 0 deletions lib/absinthe/plug/incremental/sse/connection_manager.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
defmodule Absinthe.Plug.Incremental.SSE.ConnectionManager do
@moduledoc """
Manages SSE connection lifecycle and headers.

This module handles connection setup, keep-alive functionality,
and proper SSE header configuration.
"""

import Plug.Conn

@content_type "text/event-stream"
@keep_alive_interval 30_000 # 30 seconds

@doc """
Check if the client accepts SSE responses.
"""
@spec accepts_sse?(Plug.Conn.t()) :: boolean()
def accepts_sse?(conn) do
case get_req_header(conn, "accept") do
[] -> false
headers ->
Enum.any?(headers, fn header ->
String.contains?(header, "text/event-stream") or
String.contains?(header, "*/*")
end)
end
end

@doc """
Setup proper SSE headers on the connection.
"""
@spec setup_sse_headers(Plug.Conn.t()) :: Plug.Conn.t()
def setup_sse_headers(conn) do
conn
|> put_resp_header("content-type", @content_type)
|> put_resp_header("cache-control", "no-cache")
|> put_resp_header("connection", "keep-alive")
|> put_resp_header("x-accel-buffering", "no") # Disable Nginx buffering
|> put_resp_header("access-control-allow-origin", "*") # CORS support
|> put_resp_header("access-control-allow-headers", "cache-control")
|> send_chunked(200)
end

@doc """
Schedule a keep-alive message to prevent connection timeout.
"""
@spec schedule_keep_alive() :: reference()
def schedule_keep_alive do
Process.send_after(self(), :keep_alive, @keep_alive_interval)
end
end
52 changes: 52 additions & 0 deletions lib/absinthe/plug/incremental/sse/event_formatter.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule Absinthe.Plug.Incremental.SSE.EventFormatter do
@moduledoc """
Handles formatting of SSE events for incremental delivery.

This module is responsible for converting GraphQL responses into
properly formatted SSE event data.
"""

@doc """
Format a GraphQL response as an SSE event.

## Parameters
- `event_type` - The type of event (initial, incremental, complete, error)
- `data` - The response data to include in the event
- `event_id` - Unique identifier for this event

## Returns
A binary string formatted as an SSE event.
"""
@spec format_event(String.t(), map(), non_neg_integer()) :: binary()
def format_event(event_type, data, event_id) do
encoded = Jason.encode!(data)

[
"id: #{event_id}\n",
"event: #{event_type}\n",
"data: #{encoded}\n",
"\n"
]
|> IO.iodata_to_binary()
end

@doc """
Format error data for SSE transmission.
"""
@spec format_error_response(any()) :: map()
def format_error_response(error) when is_binary(error) do
%{errors: [%{message: error}]}
end

def format_error_response(error) when is_map(error) do
%{errors: [error]}
end

def format_error_response(errors) when is_list(errors) do
%{errors: errors}
end

def format_error_response(error) do
%{errors: [%{message: inspect(error)}]}
end
end
Loading