diff --git a/server/api/src/__tests__/routes/notes/events.test.ts b/server/api/src/__tests__/routes/notes/events.test.ts new file mode 100644 index 00000000..98c8bbf7 --- /dev/null +++ b/server/api/src/__tests__/routes/notes/events.test.ts @@ -0,0 +1,359 @@ +/** + * `GET /api/notes/:noteId/events` 統合テスト (Issue #860 Phase 4)。 + * + * Hono の `streamSSE` 経由で SSE をストリーミングするため、`app.request` の戻り + * `Response` から `body.getReader()` でチャンクを読みつつ、`ready` イベントと + * 続けて publish したイベントが期待した順番で流れることを検証する。 + * + * Integration tests for `GET /api/notes/:noteId/events` (Issue #860 Phase 4). + * Streams the SSE body via `Response.body.getReader()` and asserts that the + * `ready` event and subsequently published events arrive in order. Mirrors + * the test style of `pages.test.ts` for the auth/mock-DB shape. + * + * @see ../../../routes/notes/events.ts + * @see https://github.com/otomatty/zedi/issues/860 + */ +import { afterEach, describe, expect, it, vi } from "vitest"; +import type { Context, Next } from "hono"; +import type { AppEnv } from "../../../types/index.js"; + +vi.mock("../../../middleware/auth.js", () => ({ + authRequired: async (c: Context, next: Next) => { + const userId = c.req.header("x-test-user-id"); + const userEmail = c.req.header("x-test-user-email"); + if (!userId) return c.json({ message: "Unauthorized" }, 401); + c.set("userId", userId); + if (userEmail) c.set("userEmail", userEmail); + await next(); + }, + authOptional: async (c: Context, next: Next) => { + const userId = c.req.header("x-test-user-id"); + const userEmail = c.req.header("x-test-user-email"); + if (userId) c.set("userId", userId); + if (userEmail) c.set("userEmail", userEmail); + await next(); + }, +})); + +import { + TEST_USER_ID, + OTHER_USER_ID, + TEST_USER_EMAIL, + createMockNote, + createTestApp, + authHeaders, +} from "./setup.js"; +import { + clearNoteEventSubscribers, + noteEventSubscriberCount, + publishNoteEvent, + type NoteEvent, +} from "../../../services/noteEventBroadcaster.js"; + +const NOTE_ID = "note-test-001"; + +afterEach(() => { + clearNoteEventSubscribers(); +}); + +/** + * `Response.body` を 1 度だけ getReader() するためのラッパ。テスト中に同じ + * body から複数回フレーム取得したい用途で使う。`close()` でロック解放と + * 購読解除(サーバ abort)を起こす。 + * + * Helper that locks `response.body` exactly once with `getReader()` and lets + * callers pull SSE frames incrementally. Releasing the reader via `close()` + * triggers the server-side `stream.onAbort` so the subscriber count drops. + */ +function openSseReader(response: Response, timeoutMs = 1_000) { + const body = response.body; + if (!body) throw new Error("Response.body is null"); + const reader = body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + let closed = false; + + /** + * Pulls up to `count` frames (each ending in `\n\n`). Keep-alive comment + * frames are skipped so tests can focus on real events. + * `count` 件分のフレーム(`\n\n` 区切り)を取得する。`:` で始まるコメント行 + * (キープアライブ)はスキップする。 + */ + async function readFrames(count: number): Promise { + const frames: string[] = []; + const start = Date.now(); + while (frames.length < count) { + if (Date.now() - start > timeoutMs) break; + const readPromise = reader.read(); + const timeoutPromise = new Promise<{ done: true; value: undefined }>((resolve) => + setTimeout(() => resolve({ done: true, value: undefined }), timeoutMs), + ); + const { value, done } = (await Promise.race([readPromise, timeoutPromise])) as { + value?: Uint8Array; + done?: boolean; + }; + if (done || !value) break; + buffer += decoder.decode(value, { stream: true }); + while (buffer.includes("\n\n")) { + const idx = buffer.indexOf("\n\n"); + const frame = buffer.slice(0, idx); + buffer = buffer.slice(idx + 2); + const lines = frame.split("\n").filter((l) => !l.startsWith(":")); + if (lines.length === 0) continue; + frames.push(lines.join("\n")); + if (frames.length >= count) break; + } + } + return frames; + } + + async function close(): Promise { + if (closed) return; + closed = true; + await reader.cancel().catch(() => {}); + } + + return { readFrames, close }; +} + +/** + * 1 つの SSE フレーム文字列を `{ event, data }` にパースする。 + * Parse a single SSE frame string into its `event:` / `data:` fields. + */ +function parseSseFrame(frame: string): { event: string | null; data: string } { + let event: string | null = null; + let data = ""; + for (const line of frame.split("\n")) { + if (line.startsWith("event:")) event = line.slice(6).trim(); + else if (line.startsWith("data:")) data = line.slice(5).trim(); + } + return { event, data }; +} + +describe("GET /api/notes/:noteId/events", () => { + it("returns 404 when the note is missing", async () => { + // findActiveNoteById が空配列を返すパス。 + // `findActiveNoteById` returns an empty result, so the route 404s. + const { app } = createTestApp([[]]); + + const res = await app.request(`/api/notes/${NOTE_ID}/events`, { + method: "GET", + headers: authHeaders(), + }); + + expect(res.status).toBe(404); + }); + + it("returns 403 when the caller has no role for the note", async () => { + // private ノートを別ユーザーが要求 → guest にもならず role = null。 + // Private note + non-owner caller → role resolves to null → 403. + const mockNote = createMockNote({ ownerId: OTHER_USER_ID, visibility: "private" }); + const { app } = createTestApp([[mockNote], [], []]); + + const res = await app.request(`/api/notes/${NOTE_ID}/events`, { + method: "GET", + headers: authHeaders(TEST_USER_ID, TEST_USER_EMAIL), + }); + + expect(res.status).toBe(403); + }); + + it("delivers the ready hello followed by published events to the subscriber", async () => { + const mockNote = createMockNote(); + const { app } = createTestApp([[mockNote]]); + + const res = await app.request(`/api/notes/${NOTE_ID}/events`, { + method: "GET", + headers: authHeaders(), + }); + + expect(res.status).toBe(200); + expect(res.headers.get("content-type") ?? "").toContain("text/event-stream"); + + const sse = openSseReader(res); + try { + // `ready` を待ってから publish する。streamSSE の初期 writeSSE が flush + // されるのは subscribe より僅かに後なので、最初のフレームを読み取ってから + // publish することでテストが安定する。 + // Read the initial `ready` frame first so we know subscription is wired, + // then publish; otherwise the publish can race the subscribe registration. + const readyOnly = await sse.readFrames(1); + expect(readyOnly).toHaveLength(1); + const ready = parseSseFrame(readyOnly[0] ?? ""); + expect(ready.event).toBe("ready"); + expect(JSON.parse(ready.data)).toEqual({ note_id: NOTE_ID }); + + const addedEvent: NoteEvent = { + type: "page.added", + note_id: NOTE_ID, + page: { + id: "pg-new", + owner_id: TEST_USER_ID, + note_id: NOTE_ID, + source_page_id: null, + title: "Hello", + content_preview: "preview", + thumbnail_url: null, + source_url: null, + created_at: new Date("2026-05-13T00:00:00Z"), + updated_at: new Date("2026-05-13T00:00:00Z"), + is_deleted: false, + }, + }; + publishNoteEvent(addedEvent); + + const moreFrames = await sse.readFrames(1); + expect(moreFrames).toHaveLength(1); + const added = parseSseFrame(moreFrames[0] ?? ""); + expect(added.event).toBe("page.added"); + const addedData = JSON.parse(added.data) as NoteEvent; + expect(addedData.type).toBe("page.added"); + expect(addedData.note_id).toBe(NOTE_ID); + if (addedData.type === "page.added") { + expect(addedData.page.id).toBe("pg-new"); + expect(addedData.page.title).toBe("Hello"); + } + } finally { + await sse.close(); + } + }); + + it("subscribes the caller for the duration of the stream and cleans up on cancel", async () => { + const mockNote = createMockNote(); + const { app } = createTestApp([[mockNote]]); + + expect(noteEventSubscriberCount(NOTE_ID)).toBe(0); + const res = await app.request(`/api/notes/${NOTE_ID}/events`, { + method: "GET", + headers: authHeaders(), + }); + + const sse = openSseReader(res); + // Wait for `ready` so subscription is definitely registered before we + // assert the count. + await sse.readFrames(1); + expect(noteEventSubscriberCount(NOTE_ID)).toBe(1); + + // Cancelling the reader triggers the abort path on the server side. + await sse.close(); + // Allow the abort microtasks to run. + await new Promise((r) => setTimeout(r, 20)); + expect(noteEventSubscriberCount(NOTE_ID)).toBe(0); + }); + + it("subscribes before sending ready so events published during the handshake are delivered (Codex P2 / coderabbitai)", async () => { + // Issue #860 Phase 4 リグレッションテスト: 以前は ready の前に subscribe + // していなかったため、ready 直前に publish されたイベントが取りこぼれて + // いた。修正後は subscribe → ready の順なので、handler 起動直後に + // publish しても ready の後にちゃんと届く。 + // + // Regression test for the subscribe-before-ready fix. We can't observe + // the exact instant between subscribe and ready from outside Hono, but + // we can prove the invariant by publishing AFTER the subscribe count + // becomes 1 (i.e. handler entered) but BEFORE reading any frame. If + // ready were emitted first, the publish would race the subscribe and + // be lost on slow scheduling; with the fix the event is buffered into + // the SSE writer and arrives right after `ready`. + const mockNote = createMockNote(); + const { app } = createTestApp([[mockNote]]); + + const res = await app.request(`/api/notes/${NOTE_ID}/events`, { + method: "GET", + headers: authHeaders(), + }); + + // Poll until the subscriber is registered (handler entered, subscribe + // succeeded). At this point ready has not been read yet from our side, + // but on the server it was written AFTER subscribe. + for (let i = 0; i < 50; i++) { + if (noteEventSubscriberCount(NOTE_ID) === 1) break; + await new Promise((r) => setTimeout(r, 5)); + } + expect(noteEventSubscriberCount(NOTE_ID)).toBe(1); + + publishNoteEvent({ + type: "page.deleted", + note_id: NOTE_ID, + page_id: "pg-during-handshake", + }); + + const sse = openSseReader(res); + try { + const frames = await sse.readFrames(2); + expect(frames).toHaveLength(2); + const ready = parseSseFrame(frames[0] ?? ""); + const deleted = parseSseFrame(frames[1] ?? ""); + expect(ready.event).toBe("ready"); + expect(deleted.event).toBe("page.deleted"); + expect(JSON.parse(deleted.data)).toMatchObject({ + type: "page.deleted", + note_id: NOTE_ID, + page_id: "pg-during-handshake", + }); + } finally { + await sse.close(); + } + }); + + it("closes the stream after delivering note.permission_changed (Codex P1 / coderabbitai)", async () => { + // Issue #860 Phase 4: 権限変化を受け取ったクライアントは EventSource + // 経由で再接続して `getNoteRole` を再評価する必要がある。サーバ側で + // ストリームを閉じることで、剥奪済みユーザーには後続の page.* イベントが + // 届かない。 + // + // After delivering `note.permission_changed`, the server proactively + // closes the stream so a revoked caller stops receiving subsequent + // events. Verified by checking the subscriber count drops to zero and + // the read loop hits EOF. + const mockNote = createMockNote(); + const { app } = createTestApp([[mockNote]]); + + const res = await app.request(`/api/notes/${NOTE_ID}/events`, { + method: "GET", + headers: authHeaders(), + }); + + const sse = openSseReader(res); + try { + // Wait for ready, then publish a permission change. + const ready = await sse.readFrames(1); + expect(parseSseFrame(ready[0] ?? "").event).toBe("ready"); + + publishNoteEvent({ type: "note.permission_changed", note_id: NOTE_ID }); + + const permission = await sse.readFrames(1); + expect(parseSseFrame(permission[0] ?? "").event).toBe("note.permission_changed"); + + // Stream is closing on the server side; give it a moment then assert + // the subscriber slot was released. + for (let i = 0; i < 50; i++) { + if (noteEventSubscriberCount(NOTE_ID) === 0) break; + await new Promise((r) => setTimeout(r, 5)); + } + expect(noteEventSubscriberCount(NOTE_ID)).toBe(0); + } finally { + await sse.close(); + } + }); + + it("allows guest access on public notes (authOptional)", async () => { + const mockNote = createMockNote({ ownerId: OTHER_USER_ID, visibility: "public" }); + const { app } = createTestApp([[mockNote]]); + + const res = await app.request(`/api/notes/${NOTE_ID}/events`, { + method: "GET", + headers: { "Content-Type": "application/json" }, + }); + + expect(res.status).toBe(200); + const sse = openSseReader(res); + try { + const frames = await sse.readFrames(1); + expect(frames).toHaveLength(1); + const ready = parseSseFrame(frames[0] ?? ""); + expect(ready.event).toBe("ready"); + } finally { + await sse.close(); + } + }); +}); diff --git a/server/api/src/__tests__/routes/notes/pages.test.ts b/server/api/src/__tests__/routes/notes/pages.test.ts index 72517b72..7cbfbf81 100644 --- a/server/api/src/__tests__/routes/notes/pages.test.ts +++ b/server/api/src/__tests__/routes/notes/pages.test.ts @@ -2,7 +2,7 @@ * ノートページ管理ルートのテスト(Issue #823: pages.note_id 直接モデル) * Tests for note page routes after issue #823 (`pages.note_id` ownership). */ -import { describe, it, expect, vi } from "vitest"; +import { describe, it, expect, vi, afterEach } from "vitest"; import type { Context, Next } from "hono"; import type { AppEnv } from "../../../types/index.js"; @@ -32,6 +32,15 @@ import { createTestApp, authHeaders, } from "./setup.js"; +import { + clearNoteEventSubscribers, + subscribeNoteEvents, + type NoteEvent, +} from "../../../services/noteEventBroadcaster.js"; + +afterEach(() => { + clearNoteEventSubscribers(); +}); const NOTE_ID = "note-test-001"; @@ -145,6 +154,59 @@ describe("POST /api/notes/:noteId/pages", () => { expect(res.status).toBe(403); }); + + it("publishes page.added to note subscribers (Issue #860 Phase 4)", async () => { + // POST 成功時にノート購読者へ `page.added` が配信される。SSE ルートを介さず + // ブロードキャスタ直接購読で検証することで、emit 経路だけを切り出してテスト + // する。本番経路ではこの listener が SSE writer に変わる。 + // + // POST success must fan out a `page.added` event to note subscribers. + // Bypassing the SSE route and subscribing the broadcaster directly lets + // the test focus on the publish call-site without coupling to the SSE + // transport. In production the listener is the SSE writer. + const mockNote = createMockNote(); + const newPageRow = { + id: "pg-created", + ownerId: TEST_USER_ID, + noteId: NOTE_ID, + sourcePageId: null, + title: "Emitted Page", + contentPreview: "preview body", + thumbnailUrl: "https://cdn.example/p.jpg", + sourceUrl: null, + createdAt: new Date("2026-05-13T00:00:00Z"), + updatedAt: new Date("2026-05-13T00:00:00Z"), + isDeleted: false, + }; + const { app } = createTestApp([[mockNote], [newPageRow], []]); + + const received: NoteEvent[] = []; + subscribeNoteEvents(NOTE_ID, (event) => { + received.push(event); + }); + + const res = await app.request(`/api/notes/${NOTE_ID}/pages`, { + method: "POST", + headers: authHeaders(), + body: JSON.stringify({ title: "Emitted Page" }), + }); + + expect(res.status).toBe(200); + expect(received).toHaveLength(1); + const event = received[0]; + expect(event?.type).toBe("page.added"); + if (event?.type === "page.added") { + expect(event.note_id).toBe(NOTE_ID); + expect(event.page.id).toBe("pg-created"); + expect(event.page.title).toBe("Emitted Page"); + // SSE 経路では preview / thumbnail を常に同梱する(フロントの + // useInfiniteNotePages デフォルト include と整合させるため)。 + // The SSE channel always carries preview/thumbnail to stay consistent + // with the frontend's `useInfiniteNotePages` default include set. + expect(event.page.content_preview).toBe("preview body"); + expect(event.page.thumbnail_url).toBe("https://cdn.example/p.jpg"); + } + }); }); describe("GET /api/notes/:noteId/pages (Issue #860 Phase 1 cursor window)", () => { @@ -415,6 +477,55 @@ describe("DELETE /api/notes/:noteId/pages/:pageId", () => { expect(updates.length).toBe(2); }); + it("publishes page.deleted to note subscribers (Issue #860 Phase 4)", async () => { + // DELETE 成功時にノート購読者へ `page.deleted` が配信され、client は + // 該当 id を window から落とすだけで済む。 + // DELETE success fans out a `page.deleted` event so subscribers can drop + // the page id from their cached windows. + const mockNote = createMockNote(); + const pageId = "pg-del-emit"; + const { app } = createTestApp([[mockNote], [{ id: pageId, noteId: NOTE_ID }], [], []]); + + const received: NoteEvent[] = []; + subscribeNoteEvents(NOTE_ID, (event) => { + received.push(event); + }); + + const res = await app.request(`/api/notes/${NOTE_ID}/pages/${pageId}`, { + method: "DELETE", + headers: authHeaders(), + }); + + expect(res.status).toBe(200); + expect(received).toHaveLength(1); + expect(received[0]).toEqual({ + type: "page.deleted", + note_id: NOTE_ID, + page_id: pageId, + }); + }); + + it("does not publish when DELETE fails (page belongs to other note)", async () => { + // 400 で落ちる経路では SSE 購読者へイベントを流さない(DB は更新されていない + // ため)。整合性ずれの早期検出として明示的に検証する。 + // The 400 path must not emit since no DB mutation happened. Verified + // explicitly so a future refactor cannot regress this invariant silently. + const mockNote = createMockNote(); + const pageId = "pg-other-note"; + const { app } = createTestApp([[mockNote], [{ id: pageId, noteId: "other-note-id" }]]); + + const received: NoteEvent[] = []; + subscribeNoteEvents(NOTE_ID, (event) => received.push(event)); + + const res = await app.request(`/api/notes/${NOTE_ID}/pages/${pageId}`, { + method: "DELETE", + headers: authHeaders(), + }); + + expect(res.status).toBe(400); + expect(received).toHaveLength(0); + }); + it("returns 400 when page belongs to another note", async () => { const mockNote = createMockNote(); const pageId = "pg-other-note"; diff --git a/server/api/src/__tests__/routes/pages.test.ts b/server/api/src/__tests__/routes/pages.test.ts index 4bcb0636..cf674479 100644 --- a/server/api/src/__tests__/routes/pages.test.ts +++ b/server/api/src/__tests__/routes/pages.test.ts @@ -350,16 +350,26 @@ describe("PUT /api/pages/:id/content", () => { // Issue #726: タイトル変更検出のため、PUT に title が含まれるとき pages.title // を SELECT してから UPDATE を行う。これにより伝播処理の起点になる。 + // Issue #860 Phase 4 (PR #867 review fix) で、メタデータが実際に変化したとき + // だけ pages テーブルを UPDATE する最適化が入った。タイトルが変わるケースは + // 引き続き SELECT + UPDATE の 2 段ステップを踏む。 + // // Issue #726: when PUT carries `title`, the route SELECTs the current // `pages.title` before UPDATE so the handler can detect a rename and - // trigger background propagation. - it("issues an extra SELECT for rename detection when body.title is provided", async () => { + // trigger background propagation. Issue #860 Phase 4 (PR #867 review) + // additionally gates the metadata UPDATE on a real value diff; when the + // title genuinely changes the SELECT + UPDATE pair is still issued. + it("issues an extra SELECT and pages UPDATE for rename detection when body.title differs", async () => { const ydocB64 = Buffer.from("hello").toString("base64"); const { app, chains } = createPagesAppWithChains([ ...pageAccessPrefix(), [{ version: 2, pageId: PAGE_ID }], - [{ title: "Same Title" }], - [], + // 現在のタイトルが "Old Title"、新タイトルが "New Title" なので + // SELECT + UPDATE が実際に走る。 + // Current title "Old Title" ≠ new title "New Title", so the SELECT + // detects a rename and the metadata UPDATE actually fires. + [{ title: "Old Title", contentPreview: null }], + [{ id: PAGE_ID, title: "New Title" }], [], [], ]); @@ -370,7 +380,7 @@ describe("PUT /api/pages/:id/content", () => { body: JSON.stringify({ ydoc_state: ydocB64, expected_version: 1, - title: "Same Title", + title: "New Title", }), }); @@ -383,4 +393,44 @@ describe("PUT /api/pages/:id/content", () => { const updateChains = chains.filter((c) => c.startMethod === "update"); expect(updateChains.length).toBeGreaterThanOrEqual(2); }); + + // Issue #860 Phase 4 (PR #867 review): クライアントが現在値を round-trip した + // だけの保存では、SSE が暴発しないように pages テーブルの UPDATE を skip する。 + // SELECT 1 回(現在値の取得)は走るが、metadata UPDATE は走らない。 + // + // When the client round-trips unchanged title / content_preview values, + // the route now skips the metadata UPDATE entirely so `page.updated` is + // not broadcast on a no-op save. + it("skips the pages metadata UPDATE when title/content_preview match current (PR #867)", async () => { + const ydocB64 = Buffer.from("hello").toString("base64"); + const { app, chains } = createPagesAppWithChains([ + ...pageAccessPrefix(), + [{ version: 2, pageId: PAGE_ID }], + [{ title: "Same Title", contentPreview: "Same Preview" }], + [], + [], + ]); + + const res = await app.request(`/api/pages/${PAGE_ID}/content`, { + method: "PUT", + headers: authHeaders(), + body: JSON.stringify({ + ydoc_state: ydocB64, + expected_version: 1, + title: "Same Title", + content_preview: "Same Preview", + }), + }); + + expect(res.status).toBe(200); + // SELECT は現在値取得のため 1 回走るが、pages の UPDATE は走らない。 + // page_contents の UPDATE は走る (version bump)。 + // SELECT for current values still happens; the pages UPDATE is + // skipped, while page_contents still updates (version bump). + const updateChains = chains.filter((c) => c.startMethod === "update"); + // ノートアクセス可否を見るための SELECT が起点となる UPDATE も無いため、 + // 残るのは page_contents の version bump 1 件のみ。 + // The only UPDATE left is the page_contents version bump. + expect(updateChains.length).toBe(1); + }); }); diff --git a/server/api/src/__tests__/services/noteEventBroadcaster.test.ts b/server/api/src/__tests__/services/noteEventBroadcaster.test.ts new file mode 100644 index 00000000..083f6f1a --- /dev/null +++ b/server/api/src/__tests__/services/noteEventBroadcaster.test.ts @@ -0,0 +1,187 @@ +/** + * `noteEventBroadcaster` 単体テスト。 + * subscribe / publish / capacity / unsubscribe の挙動を検証する。 + * + * Unit tests for the in-memory `noteEventBroadcaster` (subscribe, publish, + * capacity guard, unsubscribe cleanup). Mirrors `apiErrorBroadcaster.test.ts` + * style but partitions listeners by `noteId` so an event for note A is not + * delivered to subscribers of note B. + * + * @see ../../services/noteEventBroadcaster.ts + * @see https://github.com/otomatty/zedi/issues/860 + */ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { + NOTE_EVENT_STREAM_MAX_SUBSCRIBERS, + NoteEventStreamCapacityExceededError, + clearNoteEventSubscribers, + noteEventSubscriberCount, + publishNoteEvent, + subscribeNoteEvents, + type NoteEvent, +} from "../../services/noteEventBroadcaster.js"; + +const NOTE_A = "00000000-0000-4000-8000-00000000000a"; +const NOTE_B = "00000000-0000-4000-8000-00000000000b"; + +function makeAddedEvent(noteId: string, pageId = "pg-1"): NoteEvent { + return { + type: "page.added", + note_id: noteId, + page: { + id: pageId, + owner_id: "owner-1", + note_id: noteId, + source_page_id: null, + title: "New page", + content_preview: null, + thumbnail_url: null, + source_url: null, + created_at: new Date("2026-05-13T00:00:00Z"), + updated_at: new Date("2026-05-13T00:00:00Z"), + is_deleted: false, + }, + }; +} + +afterEach(() => { + clearNoteEventSubscribers(); +}); + +describe("noteEventBroadcaster", () => { + it("delivers published events to every subscriber for the same note", () => { + const a = vi.fn(); + const b = vi.fn(); + subscribeNoteEvents(NOTE_A, a); + subscribeNoteEvents(NOTE_A, b); + + const event = makeAddedEvent(NOTE_A); + publishNoteEvent(event); + + expect(a).toHaveBeenCalledTimes(1); + expect(a).toHaveBeenCalledWith(event); + expect(b).toHaveBeenCalledTimes(1); + expect(b).toHaveBeenCalledWith(event); + }); + + it("does not deliver events to subscribers of a different note", () => { + // ノートごとに購読を分離することが本ブロードキャスタの存在理由。`note_id` + // が違うイベントは決して別ノートの購読者へ漏れてはいけない。 + // Note partitioning is the whole point of this broadcaster: events for one + // note must never leak to subscribers of another. + const onA = vi.fn(); + const onB = vi.fn(); + subscribeNoteEvents(NOTE_A, onA); + subscribeNoteEvents(NOTE_B, onB); + + publishNoteEvent(makeAddedEvent(NOTE_A)); + + expect(onA).toHaveBeenCalledTimes(1); + expect(onB).not.toHaveBeenCalled(); + }); + + it("stops delivering after unsubscribe and trims the bucket", () => { + const listener = vi.fn(); + const unsubscribe = subscribeNoteEvents(NOTE_A, listener); + unsubscribe(); + + publishNoteEvent(makeAddedEvent(NOTE_A)); + + expect(listener).not.toHaveBeenCalled(); + expect(noteEventSubscriberCount(NOTE_A)).toBe(0); + expect(noteEventSubscriberCount()).toBe(0); + }); + + it("isolates a throwing subscriber from the rest", () => { + const consoleErr = vi.spyOn(console, "error").mockImplementation(() => {}); + const bad = vi.fn(() => { + throw new Error("boom"); + }); + const good = vi.fn(); + subscribeNoteEvents(NOTE_A, bad); + subscribeNoteEvents(NOTE_A, good); + + publishNoteEvent(makeAddedEvent(NOTE_A)); + + expect(bad).toHaveBeenCalledTimes(1); + expect(good).toHaveBeenCalledTimes(1); + expect(consoleErr).toHaveBeenCalled(); + consoleErr.mockRestore(); + }); + + it("rejects subscribe past the cap", () => { + // 上限まで埋めて、もう 1 つ subscribe しようとすると例外で弾かれる。 + // Fill to the cap; the next subscribe must throw the capacity error so the + // SSE route can map it to a 503 instead of silently dropping events. + for (let i = 0; i < NOTE_EVENT_STREAM_MAX_SUBSCRIBERS; i++) { + subscribeNoteEvents(NOTE_A, () => {}); + } + expect(() => subscribeNoteEvents(NOTE_A, () => {})).toThrow( + NoteEventStreamCapacityExceededError, + ); + expect(noteEventSubscriberCount()).toBe(NOTE_EVENT_STREAM_MAX_SUBSCRIBERS); + }); + + it("counts active subscribers globally and per-note", () => { + expect(noteEventSubscriberCount()).toBe(0); + expect(noteEventSubscriberCount(NOTE_A)).toBe(0); + + const u1 = subscribeNoteEvents(NOTE_A, () => {}); + const u2 = subscribeNoteEvents(NOTE_A, () => {}); + const u3 = subscribeNoteEvents(NOTE_B, () => {}); + + expect(noteEventSubscriberCount()).toBe(3); + expect(noteEventSubscriberCount(NOTE_A)).toBe(2); + expect(noteEventSubscriberCount(NOTE_B)).toBe(1); + + u1(); + u2(); + expect(noteEventSubscriberCount(NOTE_A)).toBe(0); + expect(noteEventSubscriberCount()).toBe(1); + u3(); + expect(noteEventSubscriberCount()).toBe(0); + }); + + it("treats a double-subscribe of the same listener as one slot (coderabbitai PR #867)", () => { + // `Set.add` は冪等なので、同じ listener を 2 回 subscribe しても bucket には + // 1 つしか入らない。totalSubscribers がそれに同期していないと、unsubscribe + // 1 回で計数だけが 1 残ってしまい capacity を誤って 503 にする恐れがある。 + // Subscribing the same listener twice must not inflate the counter — the + // Set holds at most one reference, so unsubscribe accounting has to match. + const listener = vi.fn(); + const unsubscribeA = subscribeNoteEvents(NOTE_A, listener); + const unsubscribeB = subscribeNoteEvents(NOTE_A, listener); + + expect(noteEventSubscriberCount()).toBe(1); + expect(noteEventSubscriberCount(NOTE_A)).toBe(1); + + // どちらの unsubscribe を呼んでも最終的に 0 に戻る。 + // Either unsubscribe collapses to 0 in the end. + unsubscribeA(); + expect(noteEventSubscriberCount()).toBe(0); + unsubscribeB(); + expect(noteEventSubscriberCount()).toBe(0); + + // 配信もたかが 1 回。 + // Dispatch only fires once. + subscribeNoteEvents(NOTE_A, listener); + publishNoteEvent(makeAddedEvent(NOTE_A)); + expect(listener).toHaveBeenCalledTimes(1); + }); + + it("delivers different event types (deleted, permission_changed)", () => { + // 別バリアントの discriminated union が無加工で listener に渡ること。 + // Other event variants flow through the same channel unchanged. + const listener = vi.fn(); + subscribeNoteEvents(NOTE_A, listener); + + const deleted: NoteEvent = { type: "page.deleted", note_id: NOTE_A, page_id: "pg-1" }; + const perm: NoteEvent = { type: "note.permission_changed", note_id: NOTE_A }; + + publishNoteEvent(deleted); + publishNoteEvent(perm); + + expect(listener).toHaveBeenNthCalledWith(1, deleted); + expect(listener).toHaveBeenNthCalledWith(2, perm); + }); +}); diff --git a/server/api/src/routes/notes/crud.ts b/server/api/src/routes/notes/crud.ts index 4a3bc954..5ffc67fc 100644 --- a/server/api/src/routes/notes/crud.ts +++ b/server/api/src/routes/notes/crud.ts @@ -34,6 +34,7 @@ import { getActivePageCounts, getActiveMemberCounts, } from "./helpers.js"; +import { publishNoteEvent } from "../../services/noteEventBroadcaster.js"; const ALLOWED_VISIBILITY = new Set(["private", "public", "unlisted", "restricted"]); const ALLOWED_EDIT_PERMISSION = new Set([ @@ -294,6 +295,23 @@ app.put("/:noteId", authRequired, async (c) => { const updatedNote = updated[0]; if (!updatedNote) throw new HTTPException(500, { message: "Failed to update note" }); + + // Issue #860 Phase 4: visibility / edit_permission の変化は `getNoteRole` + // の解釈に直結するため、ノート購読者へ sentinel を投げて details / window / + // members を invalidate させる。title だけの変更でも `noteRowToApi` の値が + // 変わるので一律で emit する。 + // Issue #860 Phase 4: changes to visibility / edit_permission flip the + // result of `getNoteRole` for some callers, so notify subscribers to + // re-evaluate access. Always emit on a successful PUT (even title-only + // changes) so the cached note shell does not drift. + if ( + visibility !== undefined || + editPermission !== undefined || + body.title !== undefined || + isOfficial !== undefined + ) { + publishNoteEvent({ type: "note.permission_changed", note_id: noteId }); + } return c.json(noteRowToApi(updatedNote)); }); diff --git a/server/api/src/routes/notes/eventHelpers.ts b/server/api/src/routes/notes/eventHelpers.ts new file mode 100644 index 00000000..11eb9103 --- /dev/null +++ b/server/api/src/routes/notes/eventHelpers.ts @@ -0,0 +1,47 @@ +/** + * ノートイベント emit 用のページ snapshot 整形ヘルパ (Issue #860 Phase 4)。 + * + * `publishNoteEvent` の `page.added` / `page.updated` ペイロードは + * `GET /api/notes/:noteId/pages` の {@link NotePageWindowItem} と同形にしておく + * ことで、フロント側の React Query infinite cache に `setQueriesData` で + * そのまま流し込める。drizzle の `.returning()` が返す camelCase 行を + * snake_case の wire 形式に揃える橋渡しがこのモジュールの責務。 + * + * Shapes a `pages` row (camelCase, as returned by drizzle `.returning()`) + * into the snake_case `NotePageWindowItem` consumed by the SSE channel and + * the windowed list endpoint (Issue #860 Phase 4). Keeping a single helper + * avoids drift between the publish path and the GET path. + * + * @see ../../services/noteEventBroadcaster.ts + * @see ./pages.ts + * @see ../pages.ts (`/api/pages` mirror routes) + */ +import type { Page } from "../../schema/index.js"; +import type { NotePageWindowItem } from "./types.js"; + +/** + * `pages` row(camelCase / drizzle 戻り値)を {@link NotePageWindowItem} に + * 変換する。SSE 経由のクライアントは `?include=preview,thumbnail` 相当の + * 全フィールドを期待するため、`content_preview` / `thumbnail_url` を + * そのまま積む。一覧 GET と異なり、ここで null マスクはしない。 + * + * Convert a `pages` row to the SSE wire snapshot. Unlike the windowed list + * which masks `content_preview` / `thumbnail_url` to `null` without an + * `?include=` token, the SSE feed always carries both because the + * `useInfiniteNotePages` default request set is `preview,thumbnail`. + */ +export function pageRowToWindowItem(row: Page): NotePageWindowItem { + return { + id: row.id, + owner_id: row.ownerId, + note_id: row.noteId, + source_page_id: row.sourcePageId ?? null, + title: row.title ?? null, + content_preview: row.contentPreview ?? null, + thumbnail_url: row.thumbnailUrl ?? null, + source_url: row.sourceUrl ?? null, + created_at: row.createdAt, + updated_at: row.updatedAt, + is_deleted: row.isDeleted, + }; +} diff --git a/server/api/src/routes/notes/events.ts b/server/api/src/routes/notes/events.ts new file mode 100644 index 00000000..b8669e78 --- /dev/null +++ b/server/api/src/routes/notes/events.ts @@ -0,0 +1,219 @@ +/** + * GET /api/notes/:noteId/events — ノート単位のページ変更イベントを SSE で配信する + * (Issue #860 Phase 4)。`authOptional` + {@link getNoteRole} の組み合わせにより、 + * 公開 / unlisted ノートでは未ログインの guest でもイベントを購読できる。 + * + * SSE feed for note-scoped page mutation events (Issue #860 Phase 4). The + * client subscribes via `EventSource('/api/notes/:noteId/events')` and applies + * each event to its `useInfiniteNotePages` React Query cache, avoiding a full + * window refetch on every mutation. Auth is `authOptional` so guests of + * public / unlisted notes can watch the feed; private / restricted notes still + * reject unknown callers with 403. + * + * イベント仕様 / Wire events: + * - `ready` : 接続確立直後の hello。`retry: 30000` 付き。 + * - `page.added` : `data` は {@link NoteEvent} JSON。 + * - `page.updated` : 同上。 + * - `page.deleted` : 同上 (`{ note_id, page_id }`)。 + * - `note.permission_changed` : 同上 (`{ note_id }` のみのセンチネル)。 + * + * 詳細は {@link ../../services/noteEventBroadcaster.ts} を参照。 + * + * @see ../../services/noteEventBroadcaster.ts + * @see ../admin/errors.ts (streamSSE / keep-alive / capacity check 元) + * @see https://github.com/otomatty/zedi/issues/860 + */ +import { Hono } from "hono"; +import { HTTPException } from "hono/http-exception"; +import { streamSSE } from "hono/streaming"; +import { authOptional } from "../../middleware/auth.js"; +import type { AppEnv } from "../../types/index.js"; +import { getNoteRole } from "./helpers.js"; +import { + NOTE_EVENT_STREAM_MAX_SUBSCRIBERS, + noteEventSubscriberCount, + subscribeNoteEvents, + type NoteEvent, +} from "../../services/noteEventBroadcaster.js"; + +const app = new Hono(); + +/** + * SSE 接続のキープアライブ間隔 (ms)。プロキシや LB がアイドル接続を切る前に + * コメント行を送って TCP を温存する。`routes/admin/errors.ts` と同値。 + * + * Keep-alive interval (ms) for SSE connections. Mirrors the value in + * `routes/admin/errors.ts` so proxies and LBs see a consistent heartbeat + * regardless of which feed the client subscribes to. + */ +const SSE_KEEPALIVE_MS = 25_000; + +/** + * `NoteEvent` を SSE wire 形式の `event` 名 + JSON `data` に整形する。 + * + * Serialize a {@link NoteEvent} into the SSE event name + JSON data pair + * consumed by `EventSource.addEventListener(name, ...)` on the client. + */ +function serializeNoteEvent(event: NoteEvent): { event: NoteEvent["type"]; data: string } { + return { event: event.type, data: JSON.stringify(event) }; +} + +/** + * GET /api/notes/:noteId/events + * + * - 404: ノートが存在しない / 削除済み。 + * - 403: ロール解決失敗(private / restricted を guest が要求した等)。 + * - 503: 購読者上限超過 ({@link NOTE_EVENT_STREAM_MAX_SUBSCRIBERS})。 + * - 200: `text/event-stream` を返し、`ready` イベントの直後から購読を開始する。 + * + * - 404: missing or soft-deleted note. + * - 403: caller has no role for the note (typical for guest hitting a private + * note). + * - 503: subscriber cap reached; the client must back off before retrying. + * - 200: SSE stream that emits a `ready` hello followed by `page.added` / + * `page.updated` / `page.deleted` / `note.permission_changed` events as they + * are published. + */ +app.get("/:noteId/events", authOptional, async (c) => { + const noteId = c.req.param("noteId"); + const userId = c.get("userId"); + const userEmail = c.get("userEmail"); + const db = c.get("db"); + + const { role, note } = await getNoteRole(noteId, userId, userEmail, db); + if (!note) throw new HTTPException(404, { message: "Note not found" }); + if (!role) throw new HTTPException(403, { message: "Forbidden" }); + + // 上限チェックは streamSSE を呼ぶ前に行う。`streamSSE` が走り出すと + // 200 + `text/event-stream` のレスポンスが既に開始されてしまい、503 へ + // 降格できないため。クライアント側は 503 を見て backoff する想定。 + // Reject capacity exhaustion BEFORE `streamSSE` commits the response. Once + // the helper sets `200 text/event-stream`, the status cannot be downgraded + // to 503, so the cap is checked up front and surfaced as a real HTTP error + // that EventSource will translate into a clean reconnect. + if (noteEventSubscriberCount() >= NOTE_EVENT_STREAM_MAX_SUBSCRIBERS) { + return c.json({ error: `subscriber cap reached (${NOTE_EVENT_STREAM_MAX_SUBSCRIBERS})` }, 503); + } + + return streamSSE( + c, + async (stream) => { + // Issue #860 Phase 4: subscribe BEFORE emitting `ready` so any event + // published in the gap between handshake and subscription is not lost. + // Codex P2 + coderabbitai PR #867 review. + // + // 購読登録を `ready` 送信の前に行い、両者の間に publish された + // イベントが取りこぼされないようにする。`ready` の前に subscribe する + // ことで、クライアントが `ready` を受け取った瞬間からはすべての + // イベントが届く契約を成立させる。 + // + // 上限はルート冒頭で確認済みのため通常 throw しないが、競合で同時に + // 限界を超えた場合に備えて catch して接続を畳む。 + // Capacity is verified above, but a concurrent subscribe could still + // exceed the cap; close cleanly if that race fires. + // Issue #860 Phase 4: `note.permission_changed` を受信したら、それを + // クライアントへ書き出した直後にストリームを閉じる。クライアントは + // EventSource の自動再接続経由で `getNoteRole` を再評価するので、 + // 既にアクセス権を失ったユーザーには次の page.* イベントが届かない + // (Codex P1 / coderabbitai critical on PR #867)。 + // + // After delivering `note.permission_changed`, proactively close the + // stream so the client reconnects through `getNoteRole` and a revoked + // caller stops receiving subsequent `page.*` events. Combined with the + // client-side rotation in `useNotePageEvents`, this gives instant + // revocation without a per-event re-auth (Codex P1 / coderabbitai + // critical on PR #867). + let unsubscribe: (() => void) | null = null; + try { + let writeChain: Promise = Promise.resolve(); + unsubscribe = subscribeNoteEvents(noteId, (event) => { + writeChain = writeChain + .then(async () => { + if (stream.aborted || stream.closed) return; + await stream.writeSSE(serializeNoteEvent(event)); + if (event.type === "note.permission_changed") { + // 配信直後に購読を解放しつつストリームを閉じる。`stream.close()` + // だけだと keep-alive ループの `await stream.sleep(...)` が + // 25 秒沈黙したままなので、明示的に unsubscribe しないと購読者 + // スロットが解放されない。 + // Release the subscriber slot eagerly: `stream.close()` alone + // doesn't wake the keep-alive `await stream.sleep(...)` loop, + // so the listener would linger until the sleep returns 25 s + // later. Explicit unsubscribe here matches the revocation + // requirement immediately. + unsubscribe?.(); + unsubscribe = null; + await stream.close(); + } + }) + .catch((err) => { + const message = err instanceof Error ? err.message : String(err); + console.error(`[note-events-stream] write failed: ${message}`); + }); + }); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + console.error(`[note-events-stream] subscribe failed: ${message}`); + await stream.writeSSE({ + event: "error", + data: JSON.stringify({ error: "subscribe failed" }), + }); + await stream.close(); + return; + } + + // 購読登録より後のすべて(ready 書き込み + keep-alive ループ)を + // try/finally で囲む。`stream.writeSSE("ready")` がクライアント切断中 + // などで throw した場合でも、finally で必ず購読を解放する。これを + // しないと subscriber slot がリークし、最終的に capacity cap (256) を + // 食い潰して 503 を出すようになる (coderabbitai major on PR #867)。 + // + // Wrap everything after subscribe registration in try/finally. If the + // initial `writeSSE("ready")` throws (e.g. client disconnected mid + // handshake), the finally guarantees the subscriber slot is freed. + // Without this guard a sequence of failed handshakes can exhaust the + // 256-subscriber cap and start rejecting healthy clients with 503 + // (coderabbitai major on PR #867). + stream.onAbort(() => { + unsubscribe?.(); + unsubscribe = null; + }); + + try { + // Subscription is live — now emit `ready`. From this point on the + // client can rely on receiving every event the server publishes. + await stream.writeSSE({ + event: "ready", + data: JSON.stringify({ note_id: noteId }), + retry: 30_000, + }); + + // クライアントが切断するまでキープアライブを送り続ける。生の SSE コメント + // 行(`:` 始まり)を書く: `writeSSE` だと `event:` フィールドが必ず付与され、 + // クライアントは「無名イベント」を受信してしまうため (PR #816 review)。 + // Heartbeat loop: emit a raw SSE comment line every SSE_KEEPALIVE_MS so + // idle proxies don't tear down the TCP connection. Using `write` (not + // `writeSSE`) keeps the line as a comment instead of a named event. + while (!stream.aborted && !stream.closed) { + await stream.sleep(SSE_KEEPALIVE_MS); + if (stream.aborted || stream.closed) break; + await stream.write(": ping\n\n"); + } + } finally { + unsubscribe?.(); + unsubscribe = null; + } + }, + async (err, stream) => { + const message = err instanceof Error ? err.message : String(err); + console.error(`[note-events-stream] handler error: ${message}`); + try { + await stream.writeSSE({ event: "error", data: JSON.stringify({ error: "stream error" }) }); + } catch { + /* swallow — connection likely closed */ + } + }, + ); +}); + +export default app; diff --git a/server/api/src/routes/notes/index.ts b/server/api/src/routes/notes/index.ts index 7c246d0b..c16ccd3c 100644 --- a/server/api/src/routes/notes/index.ts +++ b/server/api/src/routes/notes/index.ts @@ -10,6 +10,7 @@ import memberRoutes from "./members.js"; import inviteLinkRoutes from "./inviteLinks.js"; import domainAccessRoutes from "./domainAccess.js"; import searchRoutes from "./search.js"; +import eventsRoutes from "./events.js"; const app = new Hono(); @@ -22,5 +23,6 @@ app.route("/", memberRoutes); app.route("/", inviteLinkRoutes); app.route("/", domainAccessRoutes); app.route("/", searchRoutes); +app.route("/", eventsRoutes); export default app; diff --git a/server/api/src/routes/notes/members.ts b/server/api/src/routes/notes/members.ts index 22465385..ce54a986 100644 --- a/server/api/src/routes/notes/members.ts +++ b/server/api/src/routes/notes/members.ts @@ -21,6 +21,7 @@ import { sendInvitation, upsertInvitationTokenInDbThrowing, } from "../../services/invitationService.js"; +import { publishNoteEvent } from "../../services/noteEventBroadcaster.js"; const app = new Hono(); @@ -115,6 +116,12 @@ app.post("/:noteId/members", authRequired, async (c) => { invitationSent = true; } + // Issue #860 Phase 4: メンバー追加で `getNoteRole` の解釈が変わるので、購読者 + // に通知して details / window / members を再評価させる。 + // Issue #860 Phase 4: adding a member changes how `getNoteRole` resolves + // for that email, so notify subscribers to re-evaluate access. + publishNoteEvent({ type: "note.permission_changed", note_id: noteId }); + return c.json({ note_id: member.noteId, member_email: member.memberEmail, @@ -230,6 +237,13 @@ app.put("/:noteId/members/:memberEmail", authRequired, async (c) => { if (!updated) { throw new HTTPException(404, { message: "Member not found" }); } + + // Issue #860 Phase 4: ロール変更(viewer ↔ editor)でメンバーの編集権が + // 変わるため購読者に通知する。 + // Issue #860 Phase 4: role transitions flip `canEdit` for the affected + // member, so notify subscribers. + publishNoteEvent({ type: "note.permission_changed", note_id: noteId }); + return c.json({ note_id: updated.noteId, member_email: updated.memberEmail, @@ -255,6 +269,10 @@ app.delete("/:noteId/members/:memberEmail", authRequired, async (c) => { .set({ isDeleted: true, updatedAt: new Date() }) .where(and(eq(noteMembers.noteId, noteId), eq(noteMembers.memberEmail, memberEmail))); + // Issue #860 Phase 4: メンバー削除も `getNoteRole` の挙動を変えるため通知。 + // Issue #860 Phase 4: removing a member affects `getNoteRole`; notify too. + publishNoteEvent({ type: "note.permission_changed", note_id: noteId }); + return c.json({ removed: true }); }); diff --git a/server/api/src/routes/notes/pages.ts b/server/api/src/routes/notes/pages.ts index 7b4595bf..68843e25 100644 --- a/server/api/src/routes/notes/pages.ts +++ b/server/api/src/routes/notes/pages.ts @@ -16,6 +16,8 @@ import { authRequired, authOptional } from "../../middleware/auth.js"; import type { AppEnv } from "../../types/index.js"; import type { NotePageWindowItem, NotePageWindowResponse } from "./types.js"; import { getNoteRole, canEdit } from "./helpers.js"; +import { publishNoteEvent } from "../../services/noteEventBroadcaster.js"; +import { pageRowToWindowItem } from "./eventHelpers.js"; /** * `GET /api/notes/:noteId/pages` の最大ページサイズ。issue #860 Phase 1 で 100 件 @@ -225,6 +227,24 @@ app.post("/:noteId/pages", authRequired, async (c) => { return newPage; }); + // Issue #860 Phase 4: ノートを購読中のクライアント全員に `page.added` を配信 + // し、各 client の `useInfiniteNotePages` キャッシュへ直接 prepend させる + // ことで、window 全体の refetch を避ける。emit は DB tx 完了後に限り、 + // 失敗時の整合性ずれを防ぐ。`publishNoteEvent` 自身は throw しないので + // try/catch は不要。 + // + // Issue #860 Phase 4: notify every SSE subscriber for this note so the + // client can prepend the new page into its `useInfiniteNotePages` cache + // without refetching the whole window. The publish happens strictly after + // the transaction commits — emitting inside the tx could leak an event + // for a page that never lands. `publishNoteEvent` swallows listener + // failures internally, so no try/catch is needed here. + publishNoteEvent({ + type: "page.added", + note_id: noteId, + page: pageRowToWindowItem(created), + }); + return c.json({ created: true, page_id: created.id, @@ -267,6 +287,14 @@ app.delete("/:noteId/pages/:pageId", authRequired, async (c) => { await tx.update(notes).set({ updatedAt: new Date() }).where(eq(notes.id, noteId)); }); + // Issue #860 Phase 4: 削除をノート購読者へ通知。client は cache から + // 該当 id を取り除くだけで済む(全 window refetch は不要)。tx 完了後に + // emit する。 + // Issue #860 Phase 4: notify SSE subscribers so they can drop the page id + // from their cached windows without refetching. Emitted after the + // transaction commits to avoid announcing a delete that gets rolled back. + publishNoteEvent({ type: "page.deleted", note_id: noteId, page_id: pageId }); + return c.json({ removed: true }); }); diff --git a/server/api/src/routes/pages.ts b/server/api/src/routes/pages.ts index 7030afe1..ebd87bb5 100644 --- a/server/api/src/routes/pages.ts +++ b/server/api/src/routes/pages.ts @@ -14,7 +14,7 @@ import { Hono } from "hono"; import { HTTPException } from "hono/http-exception"; import { eq, and, sql } from "drizzle-orm"; -import { pages, pageContents } from "../schema/index.js"; +import { pages, pageContents, type Page } from "../schema/index.js"; import { authRequired } from "../middleware/auth.js"; import type { AppEnv, Database } from "../types/index.js"; import { ensureDefaultNote, getDefaultNoteOrNull } from "../services/defaultNoteService.js"; @@ -24,6 +24,8 @@ import { maybeCreateSnapshot } from "../services/snapshotService.js"; import { assertPageViewAccess, assertPageEditAccess } from "../services/pageAccessService.js"; import { propagateTitleRename } from "../services/titleRenamePropagationService.js"; import { deleteThumbnailObject } from "../services/thumbnailGcService.js"; +import { publishNoteEvent } from "../services/noteEventBroadcaster.js"; +import { pageRowToWindowItem } from "./notes/eventHelpers.js"; /** * ベストエフォートで自動スナップショットを作成する。失敗してもメイン処理には影響しない。 @@ -70,31 +72,85 @@ function tryPropagateTitleRename( }); } +/** + * Issue #860 Phase 4: PUT /content で title / content_preview が「実際に」変わった + * ときだけ `page.updated` をノート購読者へ配信する。クライアントが現在値を + * 毎回ラウンドトリップする実装でも spam しないよう、`applyPagesMetadataUpdate` + * の戻り値 `metadataChanged` で判定する。`updatedRow` も同じ helper の + * `.returning()` から渡るため、ここでは追加 SELECT を発生させない + * (gemini-code-assist + coderabbitai review on PR #867)。 + * + * Emit `page.updated` only when the metadata actually changed compared to the + * current row. A client that round-trips the unchanged values on every save + * must not trigger a broadcast — the helper's `metadataChanged` flag gates + * that. The `updatedRow` comes from `applyPagesMetadataUpdate`'s + * `.returning()`, so this path stays SELECT-free (gemini-code-assist and + * coderabbitai reviews on PR #867). + */ +function emitPageUpdatedIfChanged(metadataChanged: boolean, updatedRow: Page | null): void { + if (!metadataChanged || !updatedRow || updatedRow.isDeleted) return; + publishNoteEvent({ + type: "page.updated", + note_id: updatedRow.noteId, + page: pageRowToWindowItem(updatedRow), + }); +} + /** * PUT /content リクエストから pages テーブルの更新セットを構築し、変更があれば適用する。 * タイトル更新を検出した場合は旧タイトルを返して呼び出し側から伝播処理を * 起動できるようにする(issue #726)。 * + * Issue #860 Phase 4 で 2 つの戻り値を追加した: + * - `metadataChanged`: 現在値と比較して title または content_preview が + * 実際に変わったかどうか。クライアントが現在値をエコーバックするセーブ + * フローで SSE がスパムしないように、emit 側でこのフラグを参照する + * (coderabbitai major on PR #867)。 + * - `updatedRow`: `.returning()` の結果。emit 側で追加 SELECT せずに + * そのまま payload に流せる(gemini-code-assist medium on PR #867)。 + * * Build and apply pages-table updates (title, content_preview, updated_at) - * from the PUT body. When the title is being changed, return the old / new - * title pair so the caller can kick off rename propagation once the row - * update is durable (issue #726). + * from the PUT body. Returns: + * - `renamed` — old/new title pair when the title meaningfully changed + * (issue #726). + * - `metadataChanged` — whether title or content_preview actually differs + * from the current row. Used by the Issue #860 Phase 4 SSE emit to avoid + * broadcasting `page.updated` on round-tripped values + * (coderabbitai review on PR #867). + * - `updatedRow` — the post-update row returned by `.returning()` so the + * emit path does not need a follow-up SELECT + * (gemini-code-assist review on PR #867). */ async function applyPagesMetadataUpdate( db: { select: Database["select"]; update: Database["update"] }, pageId: string, body: { title?: string; content_preview?: string }, -): Promise<{ renamed: { oldTitle: string; newTitle: string } | null }> { +): Promise<{ + renamed: { oldTitle: string; newTitle: string } | null; + metadataChanged: boolean; + updatedRow: Page | null; +}> { let renamed: { oldTitle: string; newTitle: string } | null = null; - if (body.title !== undefined) { + // タイトル変化検知に加えて Phase 4 で content_preview の変化検知も必要 + // なので、両方をまとめて 1 回の SELECT で取り出す。 + // Title (for rename propagation) and content_preview (for SSE emit + // gating) are both compared against the current row, so fetch them in a + // single SELECT instead of two. + let currentTitle: string | null = null; + let currentPreview: string | null = null; + if (body.title !== undefined || body.content_preview !== undefined) { const current = await db - .select({ title: pages.title }) + .select({ title: pages.title, contentPreview: pages.contentPreview }) .from(pages) .where(eq(pages.id, pageId)) .limit(1); - const previousRaw = current[0]?.title ?? null; - const previousTrimmed = typeof previousRaw === "string" ? previousRaw.trim() : ""; + currentTitle = current[0]?.title ?? null; + currentPreview = current[0]?.contentPreview ?? null; + } + + if (body.title !== undefined) { + const previousTrimmed = typeof currentTitle === "string" ? currentTitle.trim() : ""; const nextTrimmed = body.title.trim(); // 正規化(小文字化)して比較することで "Foo" → "foo" のような表記揺れだけの // 変更は伝播をスキップする。`wikiLinkUtils` / `tagUtils` の照合も同一正規化。 @@ -110,13 +166,25 @@ async function applyPagesMetadataUpdate( } } + // 実際に値が異なるカラムだけを set に積む。これにより: + // 1. クライアントがエコーバックしただけのセーブで UPDATE が走らない。 + // 2. UPDATE が走らなければ updatedRow も null になり、emit もスキップされる。 + // Only stage columns whose new value really differs from the current row. + // Skipping no-op UPDATEs avoids spurious `updated_at` churn and ensures + // the SSE emit path is gated on real changes (coderabbitai PR #867). const set: Record = {}; - if (body.title !== undefined) set.title = body.title; - if (body.content_preview !== undefined) set.contentPreview = body.content_preview; - if (Object.keys(set).length === 0) return { renamed }; + if (body.title !== undefined && body.title !== currentTitle) { + set.title = body.title; + } + if (body.content_preview !== undefined && body.content_preview !== currentPreview) { + set.contentPreview = body.content_preview; + } + if (Object.keys(set).length === 0) { + return { renamed, metadataChanged: false, updatedRow: null }; + } set.updatedAt = new Date(); - await db.update(pages).set(set).where(eq(pages.id, pageId)); - return { renamed }; + const updated = await db.update(pages).set(set).where(eq(pages.id, pageId)).returning(); + return { renamed, metadataChanged: true, updatedRow: updated[0] ?? null }; } // ── GET /pages ────────────────────────────────────────────────────────────── @@ -290,9 +358,19 @@ app.put("/:id/content", authRequired, async (c) => { const insertedRow = inserted[0]; if (!insertedRow) throw new HTTPException(500, { message: "Insert failed" }); - const { renamed } = await applyPagesMetadataUpdate(tx, pageId, body); + const { renamed, metadataChanged, updatedRow } = await applyPagesMetadataUpdate( + tx, + pageId, + body, + ); - return { done: true as const, version: insertedRow.version ?? 1, renamed }; + return { + done: true as const, + version: insertedRow.version ?? 1, + renamed, + metadataChanged, + updatedRow, + }; }); if (firstSave.done) { @@ -312,6 +390,9 @@ app.put("/:id/content", authRequired, async (c) => { firstSave.renamed.newTitle, ); } + // Issue #860 Phase 4: メタデータが実際に変化したときだけ通知。 + // Issue #860 Phase 4: emit only when metadata really changed. + emitPageUpdatedIfChanged(firstSave.metadataChanged, firstSave.updatedRow); return c.json({ version: firstSave.version }); } } @@ -344,7 +425,11 @@ app.put("/:id/content", authRequired, async (c) => { const updatedRow = updated[0]; if (!updatedRow) throw new HTTPException(500, { message: "Update failed" }); - const { renamed } = await applyPagesMetadataUpdate(db, pageId, body); + const { + renamed, + metadataChanged, + updatedRow: metadataRow, + } = await applyPagesMetadataUpdate(db, pageId, body); void tryAutoSnapshot( db, @@ -359,6 +444,10 @@ app.put("/:id/content", authRequired, async (c) => { tryPropagateTitleRename(db, pageId, renamed.oldTitle, renamed.newTitle); } + // Issue #860 Phase 4: optimistic-lock 経路のメタデータ変化を通知。 + // Notify subscribers from the optimistic-lock path as well. + emitPageUpdatedIfChanged(metadataChanged, metadataRow); + return c.json({ version: updatedRow.version ?? 0 }); } @@ -382,7 +471,11 @@ app.put("/:id/content", authRequired, async (c) => { }) .returning(); - const { renamed } = await applyPagesMetadataUpdate(db, pageId, body); + const { + renamed, + metadataChanged, + updatedRow: metadataRow, + } = await applyPagesMetadataUpdate(db, pageId, body); const resultRow = result[0]; if (!resultRow) throw new HTTPException(500, { message: "Upsert failed" }); @@ -400,6 +493,10 @@ app.put("/:id/content", authRequired, async (c) => { tryPropagateTitleRename(db, pageId, renamed.oldTitle, renamed.newTitle); } + // Issue #860 Phase 4: UPSERT 経路(楽観的ロック未使用)でも emit。 + // Issue #860 Phase 4: emit from the UPSERT path too (no optimistic lock). + emitPageUpdatedIfChanged(metadataChanged, metadataRow); + return c.json({ version: resultRow.version }); }); @@ -457,6 +554,21 @@ app.post("/", authRequired, async (c) => { const row = result[0]; if (!row) throw new HTTPException(500, { message: "Insert failed" }); + + // Issue #860 Phase 4: 新規ページを所属ノート購読者に通知。本ルートは Web + // Clipper / `/notes/me` 系の創出経路でも使われるため、`/api/notes/:noteId/pages` + // の POST と同じ event を出してフロント側のキャッシュ更新を一本化する。 + // Issue #860 Phase 4: emit `page.added` so subscribers (including the + // `/api/notes/:noteId/events` consumers) update their cached windows + // without a refetch. This route is shared by Web Clipper and `/notes/me` + // flows, so emitting here keeps the cache patch behavior identical to + // `POST /api/notes/:noteId/pages`. + publishNoteEvent({ + type: "page.added", + note_id: row.noteId, + page: pageRowToWindowItem(row), + }); + return c.json( { id: row.id, @@ -500,6 +612,7 @@ app.delete("/:id", authRequired, async (c) => { .select({ thumbnailObjectId: pages.thumbnailObjectId, ownerId: pages.ownerId, + noteId: pages.noteId, }) .from(pages) .where(eq(pages.id, pageId)) @@ -510,6 +623,14 @@ app.delete("/:id", authRequired, async (c) => { .set({ isDeleted: true, thumbnailObjectId: null, updatedAt: new Date() }) .where(eq(pages.id, pageId)); + // Issue #860 Phase 4: 削除を所属ノート購読者に通知。`noteId` は同じ + // SELECT で取得済みのため追加クエリは発生しない。 + // Issue #860 Phase 4: notify subscribers of the owning note. The note id + // came from the earlier SELECT so no extra round trip is needed. + if (target?.noteId) { + publishNoteEvent({ type: "page.deleted", note_id: target.noteId, page_id: pageId }); + } + // GC は best-effort。サムネイル削除が S3 障害などで失敗しても、ページ削除 // 自体は成功させる(ユーザーから見て「削除できなかった」状態を作らない)。 // diff --git a/server/api/src/services/noteEventBroadcaster.ts b/server/api/src/services/noteEventBroadcaster.ts new file mode 100644 index 00000000..dfff7574 --- /dev/null +++ b/server/api/src/services/noteEventBroadcaster.ts @@ -0,0 +1,189 @@ +/** + * ノート単位のページ変更イベントを SSE 購読者へ配送する in-memory pub/sub + * (Issue #860 Phase 4)。`GET /api/notes/:noteId/events` で接続したクライアントは + * 該当ノートに関する `page.added` / `page.updated` / `page.deleted` / + * `note.permission_changed` だけを受け取る。 + * + * In-memory pub/sub used by the note-scoped SSE feed + * (`GET /api/notes/:noteId/events`, issue #860 Phase 4). Listeners are + * partitioned by `note_id`, so page mutations on note A never leak to clients + * watching note B. Producers (POST/DELETE under `routes/notes/pages.ts` and + * `routes/pages.ts`, member / note metadata mutations) call + * `publishNoteEvent` after the underlying DB transaction commits. + * + * `apiErrorBroadcaster` と同じく単一プロセス前提。横展開時は Redis Pub/Sub / + * Postgres LISTEN/NOTIFY に差し替える前提で、呼び出し側は `publish` / + * `subscribe` の関数シグネチャだけに依存している。 + * + * Single-process only — mirrors the `apiErrorBroadcaster` pattern. When the + * API scales horizontally, swap the implementation for Redis Pub/Sub or + * Postgres LISTEN/NOTIFY without touching the call sites; the contract here is + * intentionally `publish(event)` / `subscribe(noteId, listener) -> unsubscribe`. + * + * @see ./apiErrorBroadcaster.ts + * @see ../routes/notes/events.ts + * @see https://github.com/otomatty/zedi/issues/860 + */ +import type { NotePageWindowItem } from "../routes/notes/types.js"; + +/** + * `subscribeNoteEvents` の同時接続数上限(全ノート合算)。1 ノートに対する + * 偏りもまとめて防御する目的で、合計値で上限を設ける。 + * + * Hard cap on simultaneous SSE subscribers across all notes. The cap is on + * the total subscriber set so a single hot note cannot starve the rest by + * filling the pool, while still keeping memory and file descriptors bounded. + */ +export const NOTE_EVENT_STREAM_MAX_SUBSCRIBERS = 256; + +/** + * `page.added` / `page.updated` のペイロード本体。`GET /api/notes/:noteId/pages` + * の {@link NotePageWindowItem} と同形にしておくことで、フロント側の React + * Query infinite cache に `setQueriesData` で直接挿入できる。 + * + * Payload of `page.added` / `page.updated`. The shape mirrors + * `NotePageWindowItem` from the windowed list endpoint so the client can + * splice the event directly into its `useInfiniteNotePages` cache via + * `setQueriesData` without a re-fetch. + */ +export type NoteEventPageSnapshot = NotePageWindowItem; + +/** + * SSE で配送するノートイベントの discriminated union (Issue #860 Phase 4)。 + * + * - `page.added` : 新規ページが note に作成された + * - `page.updated` : 既存ページのタイトル / preview / thumbnail が変わった + * - `page.deleted` : ページが soft-delete された + * - `note.permission_changed` : note の visibility / edit_permission / + * member / domain rule など、`getNoteRole` の + * 解釈に影響する変更があった。クライアントは + * window / details / members を invalidate する。 + * + * Discriminated union of note-scoped SSE events delivered to the + * `/api/notes/:noteId/events` endpoint. `page.added` / `page.updated` carry + * the full window-shape page snapshot; `page.deleted` carries only the page + * id; `note.permission_changed` is a sentinel that signals subscribers to + * invalidate the note's window / details / members caches. + */ +export type NoteEvent = + | { type: "page.added"; note_id: string; page: NoteEventPageSnapshot } + | { type: "page.updated"; note_id: string; page: NoteEventPageSnapshot } + | { type: "page.deleted"; note_id: string; page_id: string } + | { type: "note.permission_changed"; note_id: string }; + +/** + * `subscribeNoteEvents` のリスナー型。同期コールバック。SSE ルートは + * Promise チェーンで非同期に直列化して書き出す。 + * + * Synchronous listener signature. The SSE route serializes writes through a + * Promise chain so callbacks must not be async themselves. + */ +export type NoteEventListener = (event: NoteEvent) => void; + +const listenersByNote = new Map>(); +let totalSubscribers = 0; + +/** + * `subscribeNoteEvents` が上限に達した時に投げる例外。SSE ルートは 503 に + * マップして backoff を促す。 + * + * Thrown by `subscribeNoteEvents` when the subscriber cap is reached. The + * SSE route surfaces it as a 503 so clients back off and retry. + */ +export class NoteEventStreamCapacityExceededError extends Error { + constructor() { + super(`note_event stream subscriber cap reached (${NOTE_EVENT_STREAM_MAX_SUBSCRIBERS})`); + this.name = "NoteEventStreamCapacityExceededError"; + } +} + +/** + * 指定ノートのイベント購読者を登録する。返り値の `unsubscribe` を必ず + * `stream.onAbort` などから呼んでメモリリークを防ぐ。 + * + * Register a listener for events on `noteId`. Always call the returned + * `unsubscribe` from the SSE handler's abort path so the listener set does + * not grow unbounded when clients reconnect. + */ +export function subscribeNoteEvents(noteId: string, listener: NoteEventListener): () => void { + if (totalSubscribers >= NOTE_EVENT_STREAM_MAX_SUBSCRIBERS) { + throw new NoteEventStreamCapacityExceededError(); + } + let bucket = listenersByNote.get(noteId); + if (!bucket) { + bucket = new Set(); + listenersByNote.set(noteId, bucket); + } + // `Set.add` は冪等なので、同じ listener が二重 subscribe された場合に + // bucket.size は増えない。`totalSubscribers` の増減を `bucket.size` の差で + // 駆動することで、unsubscribe との計数が必ず対称になる + // (coderabbitai review on PR #867 major)。 + // `Set.add` is idempotent — a double-subscribe of the same function reference + // does not enlarge the set. Drive the accounting off the size delta so the + // add/remove sides stay symmetric and bogus capacity rejections cannot drift + // in (coderabbitai review on PR #867 major). + const beforeSize = bucket.size; + bucket.add(listener); + const added = bucket.size > beforeSize; + if (added) totalSubscribers += 1; + + let active = true; + return () => { + if (!active) return; + active = false; + if (!added) return; + const current = listenersByNote.get(noteId); + if (!current) return; + if (!current.delete(listener)) return; + totalSubscribers -= 1; + if (current.size === 0) { + listenersByNote.delete(noteId); + } + }; +} + +/** + * `event.note_id` の購読者全員へイベントを配送する。リスナーが throw しても + * 他の購読者には影響しない(個別 try/catch でログだけ残す)。 + * + * Fan out `event` to every subscriber of `event.note_id`. A listener that + * throws is logged and skipped, mirroring `apiErrorBroadcaster` semantics so + * one buggy connection cannot break broadcast for the rest. + */ +export function publishNoteEvent(event: NoteEvent): void { + const bucket = listenersByNote.get(event.note_id); + if (!bucket || bucket.size === 0) return; + // イベント配信中に listener が unsubscribe / subscribe しても安全なように + // スナップショットしてから反復する。 + // Snapshot the listener set before iteration so a listener that + // un/subscribes during dispatch does not perturb the in-progress fan-out. + const snapshot = Array.from(bucket); + for (const listener of snapshot) { + try { + listener(event); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + console.error(`[note-event-broadcaster] subscriber threw: ${message}`); + } + } +} + +/** + * 購読者数を取得する。`noteId` を渡すとそのノートの購読者数、未指定なら全体数。 + * + * Subscriber count. Pass a `noteId` for the per-note total, omit it for the + * aggregate. Used by the SSE route for capacity checks and by tests. + */ +export function noteEventSubscriberCount(noteId?: string): number { + if (noteId === undefined) return totalSubscribers; + return listenersByNote.get(noteId)?.size ?? 0; +} + +/** + * 全購読者を強制解除する。テスト用ヘルパ。 + * Drop every subscriber. Test-only helper. + */ +export function clearNoteEventSubscribers(): void { + listenersByNote.clear(); + totalSubscribers = 0; +} diff --git a/src/components/page/PageGrid.test.tsx b/src/components/page/PageGrid.test.tsx index a18ec5c6..07b6849b 100644 --- a/src/components/page/PageGrid.test.tsx +++ b/src/components/page/PageGrid.test.tsx @@ -60,6 +60,16 @@ vi.mock("@/hooks/useNoteQueries", () => ({ }), })); +// Issue #860 Phase 4: PageGrid は note 文脈で `useNotePageEvents` を呼んで +// SSE を購読するが、本テストの責務は仮想化レンダリングのため、フックは +// no-op にして QueryClientProvider 依存を避ける。 +// PageGrid wires `useNotePageEvents` to subscribe to the note SSE feed in +// note context. This test focuses on virtualization, so stub the hook to a +// no-op and skip the QueryClientProvider it would otherwise require. +vi.mock("@/hooks/useNotePageEvents", () => ({ + useNotePageEvents: vi.fn(), +})); + vi.mock("./PageCard", () => ({ default: vi.fn(({ page }: { page: PageSummary }) => (
diff --git a/src/components/page/PageGrid.tsx b/src/components/page/PageGrid.tsx index 2d7fff13..5a3abc06 100644 --- a/src/components/page/PageGrid.tsx +++ b/src/components/page/PageGrid.tsx @@ -4,6 +4,7 @@ import { useAuth } from "@/hooks/useAuth"; import { useContainerColumns } from "@/hooks/useContainerColumns"; import { usePagesSummary, useSyncStatus } from "@/hooks/usePageQueries"; import { useInfiniteNotePages } from "@/hooks/useNoteQueries"; +import { useNotePageEvents } from "@/hooks/useNotePageEvents"; import PageCard from "./PageCard"; import EmptyState from "./EmptyState"; import { cn, Skeleton } from "@zedi/ui"; @@ -178,6 +179,15 @@ const PageGrid: React.FC = ({ // query (issue #860 Phase 3). The server's `updated_at DESC, id DESC` order // is preserved verbatim; no client-side resort. const noteInfinite = useInfiniteNotePages(noteId ?? "", { enabled: isNoteContext }); + // Issue #860 Phase 4: ノート画面を開いている間だけ SSE を張り、ページ追加・ + // 更新・削除・権限変更イベントを React Query infinite cache に差分適用する。 + // 結果として `useAddPageToNote` / `useRemovePageFromNote` の onSuccess + // invalidate と二重になるが、SSE 未接続時の保険として双方残す。 + // Issue #860 Phase 4: subscribe to the note-scoped SSE feed while the grid + // is visible so page mutations elsewhere patch the React Query cache + // without a full refetch. Existing mutation `onSuccess` invalidations stay + // in place as a fallback when the SSE connection hasn't established yet. + useNotePageEvents(noteId ?? "", { enabled: isNoteContext }); const pages: PageSummary[] = isNoteContext ? noteInfinite.pages : (personalQuery.data ?? []); const isLoading = isNoteContext ? noteInfinite.isLoading : personalQuery.isLoading; diff --git a/src/hooks/useNotePageEvents.test.ts b/src/hooks/useNotePageEvents.test.ts new file mode 100644 index 00000000..026cf6a5 --- /dev/null +++ b/src/hooks/useNotePageEvents.test.ts @@ -0,0 +1,281 @@ +/** + * `useNotePageEvents` / `applyNoteEventToCache` のユニットテスト (Issue #860 Phase 4)。 + * + * Unit tests for the note SSE event cache patcher (Issue #860 Phase 4). + * Drives `applyNoteEventToCache` directly against a real `QueryClient` so the + * `setQueriesData` patch logic is verified against `useInfiniteNotePages`'s + * actual query key contract (`noteKeys.pagesWindowByNoteId`). + * + * @see ./useNotePageEvents.ts + * @see ./useNoteQueries.ts + * @see https://github.com/otomatty/zedi/issues/860 + */ +import { describe, it, expect } from "vitest"; +import { QueryClient, type InfiniteData } from "@tanstack/react-query"; +import type { NotePageWindowItem, NotePageWindowResponse } from "@/lib/api/types"; +import type { NoteEvent } from "@/lib/api/noteEvents"; +import { applyNoteEventToCache } from "./useNotePageEvents"; +import { noteKeys } from "./useNoteQueries"; + +const NOTE_ID = "00000000-0000-4000-8000-00000000000a"; +const USER_ID = "user-1"; +const USER_EMAIL = "user@example.com"; +const INCLUDE = ["preview", "thumbnail"] as const; +const PAGE_SIZE = 50; + +function makeItem(id: string, overrides: Partial = {}): NotePageWindowItem { + return { + id, + owner_id: USER_ID, + note_id: NOTE_ID, + source_page_id: null, + title: `Title ${id}`, + content_preview: `preview ${id}`, + thumbnail_url: null, + source_url: null, + created_at: "2026-05-14T00:00:00Z", + updated_at: "2026-05-14T00:00:00Z", + is_deleted: false, + ...overrides, + }; +} + +/** + * 1 window のみのキャッシュをセットアップしてクエリクライアントを返す。 + * Seed a single-window infinite cache for `noteId` and return the client. + */ +function seedCache(items: NotePageWindowItem[]): QueryClient { + const client = new QueryClient(); + const data: InfiniteData = { + pages: [{ items, next_cursor: null }], + pageParams: [null], + }; + client.setQueryData(noteKeys.pagesWindow(NOTE_ID, USER_ID, USER_EMAIL, INCLUDE, PAGE_SIZE), data); + return client; +} + +function readCache(client: QueryClient): InfiniteData { + const data = client.getQueryData>( + noteKeys.pagesWindow(NOTE_ID, USER_ID, USER_EMAIL, INCLUDE, PAGE_SIZE), + ); + if (!data) throw new Error("expected cached data"); + return data; +} + +describe("applyNoteEventToCache", () => { + it("prepends a new page on page.added without disturbing existing items", () => { + const client = seedCache([makeItem("pg-1"), makeItem("pg-2")]); + const newPage = makeItem("pg-new", { title: "Fresh" }); + + const event: NoteEvent = { type: "page.added", note_id: NOTE_ID, page: newPage }; + applyNoteEventToCache(client, event); + + const data = readCache(client); + expect(data.pages[0]?.items.map((i) => i.id)).toEqual(["pg-new", "pg-1", "pg-2"]); + expect(data.pages[0]?.items[0]?.title).toBe("Fresh"); + }); + + it("does not duplicate on page.added when the id already exists", () => { + // mutation の onSuccess invalidate と SSE の page.added が両方走った場合、 + // 同じ id が 2 度 prepend されないことを担保する。 + // When the mutation's onSuccess invalidate races with the SSE event, we + // must not prepend the same id twice. + const existing = makeItem("pg-dup", { title: "Already there" }); + const client = seedCache([existing, makeItem("pg-other")]); + + const event: NoteEvent = { + type: "page.added", + note_id: NOTE_ID, + page: makeItem("pg-dup", { title: "Echo" }), + }; + applyNoteEventToCache(client, event); + + const data = readCache(client); + const ids = data.pages[0]?.items.map((i) => i.id) ?? []; + expect(ids).toEqual(["pg-dup", "pg-other"]); + // 既存行は触らない(mutation onSuccess の invalidate と二重発火しても安全)。 + // The existing row is left untouched; safe under double-firing. + expect(data.pages[0]?.items[0]?.title).toBe("Already there"); + }); + + it("moves the updated row to the head of the first window on page.updated (remove + prepend)", () => { + // PUT /content の metadata 更新は `updated_at` を bump するので、サーバ順 + // (`updated_at DESC, id DESC`) では更新ページが必ず先頭に来る。クライアント + // でも同じ移動セマンティクスを再現する。coderabbitai major on PR #867。 + // + // The server bumps `updated_at` on metadata edits, so the row must end + // up at the head of the cached windows to match the server's + // `updated_at DESC, id DESC` ordering (coderabbitai PR #867 major). + const client = seedCache([ + makeItem("pg-1", { title: "Original 1" }), + makeItem("pg-2", { title: "Original 2" }), + makeItem("pg-3", { title: "Original 3" }), + ]); + + const event: NoteEvent = { + type: "page.updated", + note_id: NOTE_ID, + page: makeItem("pg-2", { title: "Updated 2", content_preview: "new preview" }), + }; + applyNoteEventToCache(client, event); + + const data = readCache(client); + // pg-2 が先頭へ移動し、pg-1 / pg-3 は元の相対順を保つ。 + // pg-2 jumps to the head; the others preserve their relative order. + expect(data.pages[0]?.items.map((i) => i.id)).toEqual(["pg-2", "pg-1", "pg-3"]); + expect(data.pages[0]?.items[0]?.title).toBe("Updated 2"); + expect(data.pages[0]?.items[0]?.content_preview).toBe("new preview"); + }); + + it("inserts a previously-uncached row at the head on page.updated", () => { + // ローカルキャッシュにない id でも、サーバが update を通知してきたら + // 「最新行」として先頭に積む。次の natural refetch で前後関係は収束する。 + // If the id is not cached locally yet, treat the updated event as a + // fresh "latest row" and prepend. The next natural refetch reconciles. + const client = seedCache([makeItem("pg-1")]); + const event: NoteEvent = { + type: "page.updated", + note_id: NOTE_ID, + page: makeItem("pg-missing", { title: "Brand new" }), + }; + applyNoteEventToCache(client, event); + + const data = readCache(client); + expect(data.pages[0]?.items.map((i) => i.id)).toEqual(["pg-missing", "pg-1"]); + expect(data.pages[0]?.items[0]?.title).toBe("Brand new"); + }); + + it("moves a row across windows on page.updated when it lived in a later window", () => { + // ページ id が後続 window に居ても削除 + 先頭 window への prepend を行う。 + // The row may live in any window; ensure it's stripped from the later + // window and inserted at the head of the first one. + const client = new QueryClient(); + const data: InfiniteData = { + pages: [ + { items: [makeItem("pg-a")], next_cursor: "cursor-1" }, + { items: [makeItem("pg-b")], next_cursor: null }, + ], + pageParams: [null, "cursor-1"], + }; + client.setQueryData( + noteKeys.pagesWindow(NOTE_ID, USER_ID, USER_EMAIL, INCLUDE, PAGE_SIZE), + data, + ); + + applyNoteEventToCache(client, { + type: "page.updated", + note_id: NOTE_ID, + page: makeItem("pg-b", { title: "Promoted" }), + }); + + const out = client.getQueryData>( + noteKeys.pagesWindow(NOTE_ID, USER_ID, USER_EMAIL, INCLUDE, PAGE_SIZE), + ); + expect(out?.pages[0]?.items.map((i) => i.id)).toEqual(["pg-b", "pg-a"]); + expect(out?.pages[1]?.items.map((i) => i.id)).toEqual([]); + }); + + it("removes the matching id on page.deleted", () => { + const client = seedCache([makeItem("pg-1"), makeItem("pg-2"), makeItem("pg-3")]); + + const event: NoteEvent = { type: "page.deleted", note_id: NOTE_ID, page_id: "pg-2" }; + applyNoteEventToCache(client, event); + + const data = readCache(client); + expect(data.pages[0]?.items.map((i) => i.id)).toEqual(["pg-1", "pg-3"]); + }); + + it("invalidates pages/details/members on note.permission_changed", () => { + // `note.permission_changed` は権限の再評価を促すセンチネルなので、3 系列 + // を invalidate(fetchStatus が dirty 化)することを確認する。 + // The sentinel triggers re-evaluation of 3 cache families; we assert + // that each one is marked stale (`invalidate`) after the dispatch. + const client = seedCache([makeItem("pg-1")]); + + // 別途、members / detail 用のダミーキャッシュも種付けする。 + // Seed dummy entries for the members and detail caches too. + client.setQueryData(noteKeys.memberList(NOTE_ID), [{ noteId: NOTE_ID }]); + client.setQueryData(noteKeys.detail(NOTE_ID, USER_ID, USER_EMAIL), { id: NOTE_ID }); + + const event: NoteEvent = { type: "note.permission_changed", note_id: NOTE_ID }; + applyNoteEventToCache(client, event); + + // `invalidateQueries` は該当キャッシュを stale にマークする。`isStale()` + // で検証する。 + // `invalidateQueries` marks matching entries stale; check via `isStale()`. + const windowState = client.getQueryState( + noteKeys.pagesWindow(NOTE_ID, USER_ID, USER_EMAIL, INCLUDE, PAGE_SIZE), + ); + const memberState = client.getQueryState(noteKeys.memberList(NOTE_ID)); + const detailState = client.getQueryState(noteKeys.detail(NOTE_ID, USER_ID, USER_EMAIL)); + expect(windowState?.isInvalidated).toBe(true); + expect(memberState?.isInvalidated).toBe(true); + expect(detailState?.isInvalidated).toBe(true); + }); + + it("patches every window variant for the same note (different include/pageSize)", () => { + // useInfiniteNotePages は include と pageSize で別キーになる。`page.added` + // のような prefix invalidate は両方に効く必要がある。 + // `useInfiniteNotePages` keys vary by include and pageSize. A prefix + // patch must update every variant, not just one. + // + // coderabbitai minor on PR #867: 各キーには別 InfiniteData インスタンスを + // 渡す。同一参照を共有するとインプレース変更バグを取り逃がしうるため。 + // coderabbitai minor on PR #867: seed each key with a distinct + // InfiniteData object — sharing references can mask mutation bugs. + const client = new QueryClient(); + const makeFreshData = (): InfiniteData => ({ + pages: [{ items: [makeItem("pg-1")], next_cursor: null }], + pageParams: [null], + }); + client.setQueryData( + noteKeys.pagesWindow(NOTE_ID, USER_ID, USER_EMAIL, ["preview", "thumbnail"], 50), + makeFreshData(), + ); + client.setQueryData( + noteKeys.pagesWindow(NOTE_ID, USER_ID, USER_EMAIL, ["preview"], 25), + makeFreshData(), + ); + + applyNoteEventToCache(client, { + type: "page.added", + note_id: NOTE_ID, + page: makeItem("pg-fresh"), + }); + + const a = client.getQueryData>( + noteKeys.pagesWindow(NOTE_ID, USER_ID, USER_EMAIL, ["preview", "thumbnail"], 50), + ); + const b = client.getQueryData>( + noteKeys.pagesWindow(NOTE_ID, USER_ID, USER_EMAIL, ["preview"], 25), + ); + expect(a?.pages[0]?.items.map((i) => i.id)).toEqual(["pg-fresh", "pg-1"]); + expect(b?.pages[0]?.items.map((i) => i.id)).toEqual(["pg-fresh", "pg-1"]); + }); + + it("does not touch other notes' caches", () => { + // 別 noteId のキャッシュは prefix が違うので影響を受けない。 + // Other notes have different prefix keys, so their caches stay intact. + const otherNoteId = "00000000-0000-4000-8000-00000000000b"; + const client = new QueryClient(); + const data: InfiniteData = { + pages: [{ items: [makeItem("pg-other-1")], next_cursor: null }], + pageParams: [null], + }; + client.setQueryData( + noteKeys.pagesWindow(otherNoteId, USER_ID, USER_EMAIL, INCLUDE, PAGE_SIZE), + data, + ); + + applyNoteEventToCache(client, { + type: "page.added", + note_id: NOTE_ID, + page: makeItem("pg-fresh"), + }); + + const other = client.getQueryData>( + noteKeys.pagesWindow(otherNoteId, USER_ID, USER_EMAIL, INCLUDE, PAGE_SIZE), + ); + expect(other?.pages[0]?.items.map((i) => i.id)).toEqual(["pg-other-1"]); + }); +}); diff --git a/src/hooks/useNotePageEvents.ts b/src/hooks/useNotePageEvents.ts new file mode 100644 index 00000000..9f03d98c --- /dev/null +++ b/src/hooks/useNotePageEvents.ts @@ -0,0 +1,386 @@ +/** + * `useNotePageEvents` — `/api/notes/:noteId/events` SSE フィードを購読し、 + * `useInfiniteNotePages` の React Query infinite cache を差分パッチするフック + * (Issue #860 Phase 4)。 + * + * Subscribes to the note-scoped SSE feed and applies received events to the + * `useInfiniteNotePages` cache in place via `queryClient.setQueriesData`, so + * page mutations elsewhere (other tabs, other clients) propagate without a + * full window refetch. Reconnects via `EventSource`'s built-in `retry`, and + * after every successful (re)connect runs a single + * `invalidateQueries(noteKeys.pagesWindowByNoteId)` to recover anything that + * mutated during the gap. Falls back gracefully when `EventSource` is not + * defined (e.g. SSR / test environments without polyfill). + * + * @see ../lib/api/noteEvents.ts + * @see ./useNoteQueries.ts + * @see https://github.com/otomatty/zedi/issues/860 + */ +import { useEffect } from "react"; +import { type InfiniteData, useQueryClient } from "@tanstack/react-query"; +import type { NotePageWindowItem, NotePageWindowResponse } from "@/lib/api/types"; +import type { NoteEvent } from "@/lib/api/noteEvents"; +import { noteKeys } from "@/hooks/useNoteQueries"; + +/** + * `useNotePageEvents` の実行オプション。 + * + * Runtime options for {@link useNotePageEvents}. + */ +export interface UseNotePageEventsOptions { + /** + * 購読の有効化フラグ。`PageGrid` のように note 画面が開いている間だけ接続 + * したい呼び出し側で `false` を渡してマウント中の SSE 接続を抑制する。 + * + * Gate to enable the subscription. Pass `false` to suppress the SSE + * connection without unmounting the hook (e.g. when the note grid is + * hidden behind a tab). + */ + enabled?: boolean; +} + +/** + * 指定 id を `NotePageWindowItem` 配列から取り除く。`removed` フラグで + * 実際に該当行があったかを呼び出し側へ伝える(無変更時はキャッシュエントリ + * 自体を再構築しないための判定に使う)。 + * + * Strip the entry with `id` from the items array. `removed` lets the caller + * skip recreating the cache entry when nothing changed (avoids spurious + * React Query notifications). + */ +function removeIdFromItems( + items: NotePageWindowItem[], + id: string, +): { items: NotePageWindowItem[]; removed: boolean } { + const next = items.filter((item) => item.id !== id); + return { items: next, removed: next.length !== items.length }; +} + +/** + * `setQueriesData` ヘルパ: 指定 noteId に紐づく `pagesWindow` の各キャッシュを + * 走査し、コールバックで新しい `InfiniteData` を返す。 + * + * Walk every cached `pagesWindow` query for the given note and let the + * callback produce a fresh `InfiniteData`. Returning + * the same data reference is fine; React Query skips notifying observers + * when reference equality holds. + */ +function updateAllWindowsForNote( + queryClient: ReturnType, + noteId: string, + transform: ( + data: InfiniteData, + ) => InfiniteData, +): void { + queryClient.setQueriesData>( + { queryKey: noteKeys.pagesWindowByNoteId(noteId) }, + (data) => { + if (!data) return data; + return transform(data); + }, + ); +} + +/** + * SSE で受け取った `NoteEvent` を `queryClient` のキャッシュへ適用する。 + * + * Apply a received {@link NoteEvent} to the React Query cache. Pure dispatch + * over the discriminated union — kept top-level so the hook body stays + * small and unit tests can drive it directly. + */ +export function applyNoteEventToCache( + queryClient: ReturnType, + event: NoteEvent, +): void { + switch (event.type) { + case "page.added": { + // 新規ページは window 全体で最新の `updated_at` を持つはず。最初の + // window の items 先頭へ prepend し、重複(同一 id)がある場合は + // 何もしない(mutation の onSuccess invalidate と被るケース)。 + // The new page must have the highest `updated_at`, so prepend it to + // the first window. Skip when an item with the same id already exists + // to avoid duplicates after the mutation's `invalidateQueries` + // settles concurrently. + updateAllWindowsForNote(queryClient, event.note_id, (data) => { + const first = data.pages[0]; + if (!first) return data; + if (first.items.some((it) => it.id === event.page.id)) { + return data; + } + const nextFirst: NotePageWindowResponse = { + items: [event.page, ...first.items], + next_cursor: first.next_cursor, + }; + return { + ...data, + pages: [nextFirst, ...data.pages.slice(1)], + }; + }); + return; + } + case "page.updated": { + // PUT /content の metadata 更新は `updated_at` を bump するので、 + // サーバ順 (`updated_at DESC, id DESC`) では更新ページが必ず先頭に来る。 + // クライアントキャッシュも同様に「現在いる window から取り除いて先頭 + // window に prepend」する remove+prepend セマンティクスで合わせる。 + // 再ソートはしない (Phase 3 の「サーバ順を信頼する」要件)。 + // coderabbitai review on PR #867 major: in-place 置換だと旧 window に + // 取り残されてしまうため移動が必要。 + // + // `page.updated` bumps `updated_at`, which puts the row at the head + // of the server's `updated_at DESC, id DESC` ordering. Apply the same + // shift to the cached windows by removing the row wherever it lives + // and prepending the fresh copy to the first window. We do not + // re-sort anything else (Phase 3 invariant); the next natural + // refetch reconciles any cross-window drift. Fixes coderabbitai + // major on PR #867 — replace-in-place left the row stranded in its + // old window. + updateAllWindowsForNote(queryClient, event.note_id, (data) => { + const first = data.pages[0]; + if (!first) return data; + + let anyChanged = false; + const stripped = data.pages.map((page) => { + const { items, removed } = removeIdFromItems(page.items, event.page.id); + if (!removed) return page; + anyChanged = true; + return { items, next_cursor: page.next_cursor }; + }); + + const head = stripped[0]; + if (!head) return data; + + // 重複防止: 既に先頭 window に新しい event.page と同じ id があれば + // 何もしない(同一フレーム内で page.added → page.updated が連続する + // ような並びでの重複防止)。`stripped` で既に取り除いているので、 + // 通常はこの分岐に入らないが、データ不整合への防御として残す。 + // Defensive dedupe: even after stripping, if the first window + // already contains the id (e.g. another concurrent event), skip + // the prepend to avoid duplicates. + if (head.items.some((it) => it.id === event.page.id)) { + return anyChanged ? { ...data, pages: stripped } : data; + } + + const nextFirst: NotePageWindowResponse = { + items: [event.page, ...head.items], + next_cursor: head.next_cursor, + }; + return { + ...data, + pages: [nextFirst, ...stripped.slice(1)], + }; + }); + return; + } + case "page.deleted": { + updateAllWindowsForNote(queryClient, event.note_id, (data) => { + let anyChanged = false; + const nextPages = data.pages.map((page) => { + const { items, removed } = removeIdFromItems(page.items, event.page_id); + if (!removed) return page; + anyChanged = true; + return { items, next_cursor: page.next_cursor }; + }); + if (!anyChanged) return data; + return { ...data, pages: nextPages }; + }); + return; + } + case "note.permission_changed": { + // 権限の変化は `getNoteRole` の結果を変えるため、details / window / + // members の 3 系列を invalidate して次レンダリングで再評価させる。 + // Permission changes flip `getNoteRole`, so invalidate the three + // related caches and let the next render fetch fresh values. + queryClient.invalidateQueries({ + queryKey: noteKeys.detailsByNoteId(event.note_id), + }); + queryClient.invalidateQueries({ + queryKey: noteKeys.pagesWindowByNoteId(event.note_id), + }); + queryClient.invalidateQueries({ + queryKey: noteKeys.memberList(event.note_id), + }); + return; + } + } +} + +/** + * 安全に JSON.parse する。失敗時は null を返してログに残す。 + * Safely parse a JSON payload; logs and returns null on failure so a + * malformed frame from the server does not crash the React tree. + */ +function tryParseJson(raw: string): unknown { + try { + return JSON.parse(raw); + } catch (err) { + console.warn("[useNotePageEvents] failed to parse event payload:", err); + return null; + } +} + +/** + * `EventSource` のインスタンスにイベントハンドラを束ねるユーティリティ。 + * `EventSource` ハンドラはイベント名ごとに `addEventListener` で登録するため、 + * unmount で確実にすべて剥がせるよう登録の戻りを集めて返す。 + * + * Attach typed listeners to an `EventSource`. Each SSE event name (`ready`, + * `page.added`, …) has its own listener registered via `addEventListener`, + * so we collect detach callbacks and run them on cleanup to avoid leaks. + */ +function attachNoteEventListeners( + es: EventSource, + onReady: () => void, + onEvent: (event: NoteEvent) => void, +): () => void { + const detachers: Array<() => void> = []; + + const wrap = (name: string, handler: (msg: MessageEvent) => void) => { + es.addEventListener(name, handler as EventListener); + detachers.push(() => es.removeEventListener(name, handler as EventListener)); + }; + + wrap("ready", () => { + onReady(); + }); + for (const name of ["page.added", "page.updated", "page.deleted", "note.permission_changed"]) { + wrap(name, (msg) => { + const parsed = tryParseJson(msg.data) as Record | null; + if (!parsed || typeof parsed !== "object") return; + // SSE のイベント名 `name` を信頼の単一源とする。サーバ payload の + // `type` を後から上書きされないよう、`...parsed` を先に展開して `type` + // を後置きで固定する (coderabbitai minor on PR #867)。 + // Make the wire SSE event name authoritative. Spreading `parsed` + // first and clamping `type: name` afterwards prevents a malformed + // payload from overriding the discriminator that the + // `addEventListener` wiring already validated (coderabbitai minor + // on PR #867). + const event = { ...parsed, type: name } as NoteEvent; + onEvent(event); + }); + } + + return () => { + for (const d of detachers) d(); + }; +} + +/** + * Returns the base URL configured for the API. Mirrors the resolution used by + * `createApiClient` so the SSE endpoint and the REST endpoints stay in sync. + * + * `createApiClient` と同じ手順で API のベース URL を解決する。SSE エンドポイント + * と REST エンドポイントの解決パスを揃える。 + */ +function resolveApiBaseUrl(): string { + // import.meta.env は Vite 経由でビルド時にインライン化される。 + // `import.meta.env` is inlined by Vite at build time. + const env = (import.meta as ImportMeta).env as Record | undefined; + return env?.VITE_API_BASE_URL ?? ""; +} + +/** + * ノート画面マウント中だけ `/api/notes/:noteId/events` を購読する hook。 + * 受信イベントは React Query キャッシュへ差分適用するため、`PageGrid` 側で + * 追加の `invalidateQueries` 呼び出しは不要(mutation の `onSuccess` invalidate + * は SSE 未接続時のフォールバックとして残す)。 + * + * Subscribes to the note-scoped SSE feed while the consuming component is + * mounted. Received events patch the React Query cache so the grid updates + * without a fresh refetch. `PageGrid` does not need to invalidate manually; + * existing mutation `onSuccess` invalidations remain as a fallback for + * environments where the SSE connection hasn't established yet. + */ +export function useNotePageEvents(noteId: string, options: UseNotePageEventsOptions = {}): void { + const { enabled = true } = options; + const queryClient = useQueryClient(); + + useEffect(() => { + if (!enabled || !noteId) return; + if (typeof EventSource === "undefined") return; + + const base = resolveApiBaseUrl(); + const url = `${base}/api/notes/${encodeURIComponent(noteId)}/events`; + + // 接続を作るたびに detach / close を覚えておくセル。`note.permission_changed` + // で即時 rotate するため、useEffect cleanup と中身の reconnect の両方から + // 同じハンドルを操作する。 + // Cell holding the current connection so both the useEffect cleanup + // and the in-callback rotation on `note.permission_changed` can close + // and replace the active EventSource via a single owner. + let currentEs: EventSource | null = null; + let currentDetach: (() => void) | null = null; + + const open = () => { + // EventSource は cookie ベース認証で動くため `withCredentials: true` を + // 渡す。same-origin 構成(VITE_API_BASE_URL が空)でも明示しておく。 + // EventSource needs `withCredentials: true` so the cookie-based auth + // travels with the request. Explicit even on same-origin builds. + const es = new EventSource(url, { withCredentials: true }); + currentEs = es; + + currentDetach = attachNoteEventListeners( + es, + () => { + // 毎回の `ready` で window キャッシュを 1 度 invalidate する。 + // 初回 ready は `useInfiniteNotePages` のクエリ完了から SSE + // subscribe が live になるまでの T0→subscribe ギャップを補修し、 + // 再接続時の ready は切断中の取りこぼしを補修する + // (Codex P2 / coderabbitai PR #867)。サーバが ready 送信前に + // subscribe するため、ready 後に来る event は失われない。 + // + // Invalidate the pages window on every `ready`, including the + // first one. The first ready covers the T0→subscribe race + // between `useInfiniteNotePages` finishing its initial fetch + // and the SSE subscription becoming live; subsequent readys + // (reconnects) cover the disconnect gap. The server registers + // its subscription before sending ready, so anything published + // after that point arrives via the SSE channel. + queryClient.invalidateQueries({ + queryKey: noteKeys.pagesWindowByNoteId(noteId), + }); + }, + (event) => { + applyNoteEventToCache(queryClient, event); + // Issue #860 Phase 4: 権限変化を検知したら EventSource をその場で + // 閉じて張り直す。サーバ側も `note.permission_changed` を書き出した + // 後にストリームを閉じるが、`retry: 30000` の auto-reconnect 待ち + // が走る前にクライアント側で即時再接続することで、新権限の + // 再評価とフィード回復のレイテンシを最小化する + // (Codex P1 / coderabbitai critical on PR #867)。 + // + // Client-side rotation on permission change: the server closes + // the stream after delivering this event, but EventSource would + // otherwise wait `retry: 30000` ms before reconnecting. Closing + // and re-opening here makes the re-auth round trip happen + // immediately. + if (event.type === "note.permission_changed") { + currentDetach?.(); + es.close(); + currentEs = null; + currentDetach = null; + open(); + } + }, + ); + + // `onerror` は EventSource の自動再接続が走る前にも呼ばれる。ログするだけ + // にして接続維持はブラウザに任せる(自動再接続が走らないようにしたい場合 + // は close を呼ぶが、ここでは保守的に何もしない)。 + // `onerror` fires before EventSource's built-in reconnect; just log so + // the browser's reconnect path runs unimpeded. + es.onerror = (ev) => { + console.warn("[useNotePageEvents] EventSource error", ev); + }; + }; + + open(); + + return () => { + currentDetach?.(); + currentEs?.close(); + currentEs = null; + currentDetach = null; + }; + }, [noteId, enabled, queryClient]); +} diff --git a/src/lib/api/index.ts b/src/lib/api/index.ts index c7c85797..f627e517 100644 --- a/src/lib/api/index.ts +++ b/src/lib/api/index.ts @@ -1,2 +1,11 @@ export { createApiClient, ApiError, type ApiClient, type ApiClientOptions } from "./apiClient"; export type * from "./types"; +export { + NOTE_EVENT_NAMES, + type NoteEventName, + type NoteEvent, + type NotePageEventData, + type NotePageDeletedEventData, + type NotePermissionChangedEventData, + type NoteReadyEventData, +} from "./noteEvents"; diff --git a/src/lib/api/noteEvents.ts b/src/lib/api/noteEvents.ts new file mode 100644 index 00000000..99904acd --- /dev/null +++ b/src/lib/api/noteEvents.ts @@ -0,0 +1,110 @@ +/** + * `/api/notes/:noteId/events` SSE フィードの wire 型 (Issue #860 Phase 4)。 + * + * サーバ側 `server/api/src/services/noteEventBroadcaster.ts` の `NoteEvent` + * union と同じ shape を、フロント側で独立に宣言する(既存方針: server コードを + * 直接 import しない)。サーバが送る `event:` フィールドが各イベント名 + * (`page.added` / `page.updated` / `page.deleted` / `note.permission_changed`) + * になるため、union の `type` 値もそれに揃える。`page.added` / `page.updated` + * の `page` payload は `NotePageWindowItem` と同形で、preview / thumbnail を + * 常に含む(SSE 経路は include 選択をサポートしない)。 + * + * Wire types for the `/api/notes/:noteId/events` SSE feed (Issue #860 Phase 4). + * Declared independently from `server/api/src/services/noteEventBroadcaster.ts` + * to keep the frontend free of server-side imports, but the discriminator + * (`type`) and field names match exactly so the SSE bytes can be cast + * directly into a {@link NoteEvent}. The `page` payload mirrors + * {@link NotePageWindowItem} and always carries `content_preview` / + * `thumbnail_url` (the SSE channel does not support `?include=`). + * + * @see ../../hooks/useNotePageEvents.ts + * @see https://github.com/otomatty/zedi/issues/860 + */ +import type { NotePageWindowItem } from "./types"; + +/** + * SSE で受信するノートイベント名定数。`EventSource.addEventListener(name, ...)` + * に渡す。`ready` は接続確立直後のハロー、それ以外はサーバ側の DB ミューテーション + * 後に publish される。 + * + * Constant tuple of SSE event names. Used as the `event` field both on the + * wire and as argument to `EventSource.addEventListener`. `ready` is the + * hello that the server emits on connect; the other four are produced by the + * page / member / note metadata mutation handlers. + */ +export const NOTE_EVENT_NAMES = [ + "ready", + "page.added", + "page.updated", + "page.deleted", + "note.permission_changed", +] as const; + +/** Union of valid SSE event names for this feed. */ +export type NoteEventName = (typeof NOTE_EVENT_NAMES)[number]; + +/** + * `ready` イベントの payload。接続が確立して購読が始まったことを示すだけの + * ハロー。フロント側は invalidate トリガとして使う(再接続のラウンドで + * 切断中の取りこぼしを補修するため)。 + * + * `ready` event payload. The server sends it immediately after subscribing + * the client; the frontend uses it as the trigger to invalidate the pages + * window cache once, covering anything that mutated while the connection + * was being established or after a reconnect. + */ +export interface NoteReadyEventData { + note_id: string; +} + +/** + * `page.added` / `page.updated` の payload。`page` は `NotePageWindowItem` と + * 同形なので `useInfiniteNotePages` のキャッシュへ直接挿入できる。 + * + * Payload for `page.added` / `page.updated`. The `page` slot is a + * `NotePageWindowItem` so it can be spliced directly into the + * `useInfiniteNotePages` React Query cache. + */ +export interface NotePageEventData { + note_id: string; + page: NotePageWindowItem; +} + +/** + * `page.deleted` の payload。サーバは消したページ id だけを通知し、 + * クライアントは window の items 配列からその id をフィルタする。 + * + * Payload for `page.deleted`. The server only sends the deleted page id; + * the client removes it from every cached window for that note. + */ +export interface NotePageDeletedEventData { + note_id: string; + page_id: string; +} + +/** + * `note.permission_changed` の payload。ノートの visibility / edit_permission / + * member / domain rule のいずれかが変わったことを示すセンチネル。クライアントは + * details / window / members を invalidate する。 + * + * Payload for `note.permission_changed`. Sentinel that signals one of + * visibility / edit_permission / member / domain rule changed. The client + * invalidates the note's details, pages window, and members cache so the + * next render re-evaluates access. + */ +export interface NotePermissionChangedEventData { + note_id: string; +} + +/** + * SSE feed が配信するイベントの discriminated union。サーバ側の `NoteEvent` + * (`server/api/src/services/noteEventBroadcaster.ts`)と shape が一致する。 + * + * Discriminated union of events sent over the SSE feed. Matches the + * server-side `NoteEvent` in `noteEventBroadcaster.ts` slot-for-slot. + */ +export type NoteEvent = + | { type: "page.added"; note_id: string; page: NotePageWindowItem } + | { type: "page.updated"; note_id: string; page: NotePageWindowItem } + | { type: "page.deleted"; note_id: string; page_id: string } + | { type: "note.permission_changed"; note_id: string };