From 5e18bc6aceb719843b47dc27faedd6472bf20a89 Mon Sep 17 00:00:00 2001 From: Lachlan Hunt Date: Tue, 17 Mar 2026 22:54:21 +1100 Subject: [PATCH] Add WPT tests for EventSource.fromReadableStream() Tests cover construction and initial state, message receiving, event stream parsing (comments, custom events, multi-line data, newline formats, BOM handling, chunk splitting), id and retry fields, close() behaviour, stream done and error handling, no reconnection, and the TransformStream reconnection pattern. Includes fetch() integration tests using the existing message.py server resource. --- ...ource-from-readable-stream-fetch.window.js | 27 ++ .../eventsource-from-readable-stream.any.js | 399 ++++++++++++++++++ 2 files changed, 426 insertions(+) create mode 100644 eventsource/eventsource-from-readable-stream-fetch.window.js create mode 100644 eventsource/eventsource-from-readable-stream.any.js diff --git a/eventsource/eventsource-from-readable-stream-fetch.window.js b/eventsource/eventsource-from-readable-stream-fetch.window.js new file mode 100644 index 00000000000000..adfa1ace12b680 --- /dev/null +++ b/eventsource/eventsource-from-readable-stream-fetch.window.js @@ -0,0 +1,27 @@ +// META: title=EventSource: fromReadableStream() with fetch() + +async_test(function(t) { + fetch("resources/message.py").then(t.step_func(function(response) { + const source = EventSource.fromReadableStream(response.body); + source.onmessage = t.step_func(function(e) { + assert_equals(e.data, "data"); + source.close(); + t.done(); + }); + })); +}, "works with a fetch() response body"); + +async_test(function(t) { + fetch("resources/message.py?message=id%3A%2042%0Aretry%3A%205000%0Adata%3A%20hello").then( + t.step_func(function(response) { + const source = EventSource.fromReadableStream(response.body); + source.onmessage = t.step_func(function(e) { + assert_equals(e.data, "hello"); + assert_equals(source.lastEventId, "42"); + assert_equals(source.reconnectionTime, 5000); + source.close(); + t.done(); + }); + }) + ); +}, "lastEventId and reconnectionTime work with fetch() response"); diff --git a/eventsource/eventsource-from-readable-stream.any.js b/eventsource/eventsource-from-readable-stream.any.js new file mode 100644 index 00000000000000..3935ed477300d9 --- /dev/null +++ b/eventsource/eventsource-from-readable-stream.any.js @@ -0,0 +1,399 @@ +// META: title=EventSource: fromReadableStream() + +// Helper: create a ReadableStream from a string of event-stream data +function streamFromString(str) { + const bytes = new TextEncoder().encode(str); + return new ReadableStream({ + start(controller) { + controller.enqueue(bytes); + controller.close(); + } + }); +} + +// Helper: create a ReadableStream that enqueues multiple chunks +function streamFromChunks(chunks) { + return new ReadableStream({ + start(controller) { + for (const chunk of chunks) { + controller.enqueue(new TextEncoder().encode(chunk)); + } + controller.close(); + } + }); +} + +// Helper: create a ReadableStream that errors +function streamThatErrors(error) { + return new ReadableStream({ + start(controller) { + controller.error(error); + } + }); +} + +// --- Construction and initial state --- + +test(function() { + const source = EventSource.fromReadableStream(streamFromString("")); + assert_true(source instanceof EventSource); + source.close(); +}, "fromReadableStream() returns an EventSource instance"); + +test(function() { + const source = EventSource.fromReadableStream(streamFromString("")); + assert_equals(source.readyState, EventSource.CONNECTING); + source.close(); +}, "readyState is CONNECTING immediately after construction"); + +test(function() { + const source = EventSource.fromReadableStream(streamFromString("")); + assert_equals(source.url, "about:event-stream"); + source.close(); +}, "url returns about:event-stream"); + +test(function() { + const source = EventSource.fromReadableStream(streamFromString("")); + assert_equals(source.withCredentials, false); + source.close(); +}, "withCredentials returns false"); + +test(function() { + const source = EventSource.fromReadableStream(streamFromString("")); + assert_equals(source.lastEventId, ""); + source.close(); +}, "lastEventId initially returns the empty string"); + +test(function() { + const source = EventSource.fromReadableStream(streamFromString("")); + assert_greater_than(source.reconnectionTime, 0); + source.close(); +}, "reconnectionTime initially returns a positive value"); + +// --- Error conditions --- + +test(function() { + const stream = new ReadableStream(); + stream.getReader(); // locks the stream + assert_throws_js(TypeError, function() { + EventSource.fromReadableStream(stream); + }); +}, "throws TypeError if the stream is locked"); + +// --- open event --- + +async_test(function(t) { + const source = EventSource.fromReadableStream(streamFromString("data: hello\n\n")); + source.onopen = t.step_func(function(e) { + assert_equals(e.type, "open"); + assert_equals(source.readyState, EventSource.OPEN); + source.close(); + t.done(); + }); +}, "fires open event"); + +// --- Receiving messages --- + +async_test(function(t) { + const source = EventSource.fromReadableStream( + streamFromString("data: hello\n\n") + ); + source.onmessage = t.step_func(function(e) { + assert_equals(e.data, "hello"); + source.close(); + t.done(); + }); +}, "receives a message event"); + +async_test(function(t) { + const source = EventSource.fromReadableStream( + streamFromString("data: hello\ndata: world\n\n") + ); + source.onmessage = t.step_func(function(e) { + assert_equals(e.data, "hello\nworld"); + source.close(); + t.done(); + }); +}, "handles multi-line data fields"); + +async_test(function(t) { + const count = { value: 0 }; + const source = EventSource.fromReadableStream( + streamFromString("data: first\n\ndata: second\n\n") + ); + source.onmessage = t.step_func(function(e) { + if (count.value === 0) { + assert_equals(e.data, "first"); + } else if (count.value === 1) { + assert_equals(e.data, "second"); + source.close(); + t.done(); + } + count.value++; + }); +}, "receives multiple message events"); + +async_test(function(t) { + const source = EventSource.fromReadableStream( + streamFromString("event: custom\ndata: hello\n\n") + ); + source.addEventListener("custom", t.step_func(function(e) { + assert_equals(e.data, "hello"); + source.close(); + t.done(); + })); +}, "handles custom event types"); + +async_test(function(t) { + const source = EventSource.fromReadableStream( + streamFromString(": this is a comment\ndata: hello\n\n") + ); + source.onmessage = t.step_func(function(e) { + assert_equals(e.data, "hello"); + source.close(); + t.done(); + }); +}, "ignores comments"); + +async_test(function(t) { + const source = EventSource.fromReadableStream( + streamFromString("data: hello\n\n") + ); + source.onmessage = t.step_func(function(e) { + assert_equals(e.origin, "null"); + source.close(); + t.done(); + }); +}, "message event origin is 'null' (opaque origin)"); + +// --- Chunk splitting --- + +async_test(function(t) { + const source = EventSource.fromReadableStream( + streamFromChunks(["da", "ta: hel", "lo\n\n"]) + ); + source.onmessage = t.step_func(function(e) { + assert_equals(e.data, "hello"); + source.close(); + t.done(); + }); +}, "handles data split across chunks"); + +// --- id and retry fields --- + +async_test(function(t) { + const source = EventSource.fromReadableStream( + streamFromString("id: 42\ndata: hello\n\n") + ); + source.onmessage = t.step_func(function(e) { + assert_equals(e.lastEventId, "42"); + assert_equals(source.lastEventId, "42"); + source.close(); + t.done(); + }); +}, "id field updates lastEventId"); + +async_test(function(t) { + const source = EventSource.fromReadableStream( + streamFromString("retry: 5000\ndata: hello\n\n") + ); + source.onmessage = t.step_func(function(e) { + assert_equals(source.reconnectionTime, 5000); + source.close(); + t.done(); + }); +}, "retry field updates reconnectionTime"); + +async_test(function(t) { + const source = EventSource.fromReadableStream( + streamFromString("retry: bogus\ndata: hello\n\n") + ); + var initialTime = source.reconnectionTime; + source.onmessage = t.step_func(function(e) { + assert_equals(source.reconnectionTime, initialTime); + source.close(); + t.done(); + }); +}, "retry field with non-digit value is ignored"); + +// --- close() --- + +test(function() { + const source = EventSource.fromReadableStream(streamFromString("data: hello\n\n")); + source.close(); + assert_equals(source.readyState, EventSource.CLOSED); +}, "close() sets readyState to CLOSED"); + +// --- Stream done --- + +async_test(function(t) { + const source = EventSource.fromReadableStream( + streamFromString("data: hello\n\n") + ); + let gotMessage = false; + source.onmessage = t.step_func(function(e) { + assert_equals(e.data, "hello"); + gotMessage = true; + }); + source.onerror = t.step_func(function(e) { + assert_true(gotMessage, "message should be received before error"); + assert_equals(source.readyState, EventSource.CLOSED); + t.done(); + }); +}, "stream done fires error after dispatching complete events"); + +async_test(function(t) { + const source = EventSource.fromReadableStream( + streamFromString("") + ); + source.onerror = t.step_func(function(e) { + assert_equals(source.readyState, EventSource.CLOSED); + t.done(); + }); +}, "empty stream fires error and transitions to CLOSED"); + +// --- Stream error --- + +async_test(function(t) { + const source = EventSource.fromReadableStream( + streamThatErrors(new Error("stream error")) + ); + source.onerror = t.step_func(function(e) { + assert_equals(source.readyState, EventSource.CLOSED); + t.done(); + }); +}, "stream error fires error event and transitions to CLOSED"); + +// --- No reconnection --- + +async_test(function(t) { + const source = EventSource.fromReadableStream( + streamFromString("data: hello\n\n") + ); + let errorCount = 0; + source.onerror = t.step_func(function(e) { + errorCount++; + assert_equals(source.readyState, EventSource.CLOSED); + // Wait to verify no reconnection attempt occurs + t.step_timeout(function() { + assert_equals(errorCount, 1, "should only get one error event"); + assert_equals(source.readyState, EventSource.CLOSED); + t.done(); + }, 200); + }); +}, "no reconnection is attempted after stream done"); + +// --- Newline formats --- + +async_test(function(t) { + const source = EventSource.fromReadableStream( + streamFromString("data: cr\r\rdata: lf\n\ndata: crlf\r\n\r\n") + ); + const messages = []; + source.onmessage = t.step_func(function(e) { + messages.push(e.data); + if (messages.length === 3) { + assert_equals(messages[0], "cr"); + assert_equals(messages[1], "lf"); + assert_equals(messages[2], "crlf"); + source.close(); + t.done(); + } + }); +}, "handles CR, LF, and CRLF line endings"); + +// --- BOM handling --- + +async_test(function(t) { + const bom = new Uint8Array([0xEF, 0xBB, 0xBF]); + const data = new TextEncoder().encode("data: hello\n\n"); + const combined = new Uint8Array(bom.length + data.length); + combined.set(bom); + combined.set(data, bom.length); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(combined); + controller.close(); + } + }); + const source = EventSource.fromReadableStream(stream); + source.onmessage = t.step_func(function(e) { + assert_equals(e.data, "hello"); + source.close(); + t.done(); + }); +}, "strips leading UTF-8 BOM"); + +// --- TransformStream reconnection pattern --- + +async_test(function(t) { + const ts = new TransformStream(); + const source = EventSource.fromReadableStream(ts.readable); + const messages = []; + + source.onmessage = t.step_func(function(e) { + messages.push(e.data); + if (messages.length === 2) { + assert_equals(messages[0], "first"); + assert_equals(messages[1], "second"); + // Close the writable side to end the test + ts.writable.close(); + } + }); + + source.onerror = t.step_func(function() { + assert_equals(messages.length, 2); + t.done(); + }); + + // Simulate two "connections" piped through the TransformStream + const first = streamFromString("data: first\n\n"); + const second = streamFromString("data: second\n\n"); + + first.pipeTo(ts.writable, { preventClose: true }).then(function() { + return second.pipeTo(ts.writable, { preventClose: false }); + }); +}, "TransformStream pattern allows multiple streams to be piped through"); + +async_test(function(t) { + const ts = new TransformStream(); + const source = EventSource.fromReadableStream(ts.readable); + + source.onmessage = t.step_func(function(e) { + if (e.data === "after-reconnect") { + assert_equals(source.lastEventId, "42"); + source.close(); + t.done(); + } + }); + + const first = streamFromString("id: 42\ndata: first\n\n"); + const second = streamFromString("data: after-reconnect\n\n"); + + first.pipeTo(ts.writable, { preventClose: true }).then(function() { + // lastEventId should persist across "reconnections" + assert_equals(source.lastEventId, "42"); + return second.pipeTo(ts.writable, { preventClose: true }); + }); +}, "lastEventId persists across reconnections via TransformStream"); + +async_test(function(t) { + const ts = new TransformStream(); + const source = EventSource.fromReadableStream(ts.readable); + + source.onmessage = t.step_func(function(e) { + if (e.data === "after-reconnect") { + assert_equals(source.reconnectionTime, 3000); + source.close(); + t.done(); + } + }); + + const first = streamFromString("retry: 3000\ndata: first\n\n"); + const second = streamFromString("data: after-reconnect\n\n"); + + first.pipeTo(ts.writable, { preventClose: true }).then(function() { + assert_equals(source.reconnectionTime, 3000); + return second.pipeTo(ts.writable, { preventClose: true }); + }); +}, "reconnectionTime persists across reconnections via TransformStream");