Skip to content

Commit 2b4a391

Browse files
committed
feat(team-mode): stream message.part.delta events to member FIFO
SDK v1 Event union does not include EventMessagePartDelta, but OpenCode >=1.2.0 emits message.part.delta for streaming reasoning / text chunks (see background-agent/manager.ts line 1007). Without this handling, long provider responses that arrive as incremental deltas never reach the tmux panes, making live streaming effectively broken for any non-trivial output. Extend the hook with a narrow custom union covering the runtime shape { sessionID, partID?, field?, delta } and write deltas straight to the member FIFO. Add unit tests that cover: - multiple deltas appending to the same FIFO - non-text fields (e.g., tool) being ignored Oracle review blocker (reported by oracle session verifying #3493).
1 parent 143fe3b commit 2b4a391

2 files changed

Lines changed: 166 additions & 107 deletions

File tree

src/hooks/team-session-streamer/hook.test.ts

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,77 @@ describe("createTeamSessionStreamer", () => {
9595
expect(listActiveTeams).toHaveBeenCalledTimes(1)
9696
expect(loadRuntimeState).toHaveBeenCalledTimes(1)
9797
})
98+
99+
test("streams incremental message.part.delta events to member fifo", async () => {
100+
// given
101+
const listActiveTeams = mock(async () => [{
102+
teamRunId: "11111111-1111-4111-8111-111111111111",
103+
teamName: "team-alpha",
104+
status: "active",
105+
memberCount: 1,
106+
scope: "project" as const,
107+
}])
108+
const loadRuntimeState = mock(async () => createRuntimeState())
109+
const config = TeamModeConfigSchema.parse({ enabled: true, tmux_visualization: true })
110+
const streamer = createTeamSessionStreamer(config, { listActiveTeams, loadRuntimeState })
111+
112+
// when
113+
await streamer.event({
114+
event: {
115+
type: "message.part.delta",
116+
properties: {
117+
sessionID: "member-session",
118+
partID: "part-delta",
119+
field: "text",
120+
delta: "chunk-one ",
121+
},
122+
},
123+
})
124+
await streamer.event({
125+
event: {
126+
type: "message.part.delta",
127+
properties: {
128+
sessionID: "member-session",
129+
partID: "part-delta",
130+
field: "text",
131+
delta: "chunk-two",
132+
},
133+
},
134+
})
135+
136+
// then
137+
expect(writeTeamSessionFifoMock).toHaveBeenCalledTimes(2)
138+
expect(writeTeamSessionFifoMock).toHaveBeenNthCalledWith(1, "/tmp/omo-team/11111111-1111-4111-8111-111111111111/member-a.fifo", "chunk-one ")
139+
expect(writeTeamSessionFifoMock).toHaveBeenNthCalledWith(2, "/tmp/omo-team/11111111-1111-4111-8111-111111111111/member-a.fifo", "chunk-two")
140+
})
141+
142+
test("ignores delta events for non-text fields", async () => {
143+
// given
144+
const listActiveTeams = mock(async () => [{
145+
teamRunId: "11111111-1111-4111-8111-111111111111",
146+
teamName: "team-alpha",
147+
status: "active",
148+
memberCount: 1,
149+
scope: "project" as const,
150+
}])
151+
const loadRuntimeState = mock(async () => createRuntimeState())
152+
const config = TeamModeConfigSchema.parse({ enabled: true, tmux_visualization: true })
153+
const streamer = createTeamSessionStreamer(config, { listActiveTeams, loadRuntimeState })
154+
155+
// when
156+
await streamer.event({
157+
event: {
158+
type: "message.part.delta",
159+
properties: {
160+
sessionID: "member-session",
161+
partID: "part-other",
162+
field: "tool",
163+
delta: "ignored",
164+
},
165+
},
166+
})
167+
168+
// then
169+
expect(writeTeamSessionFifoMock).not.toHaveBeenCalled()
170+
})
98171
})
Lines changed: 93 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import type { Event, EventMessagePartUpdated, Part } from "@opencode-ai/sdk"
2+
13
import type { TeamModeConfig } from "../../config/schema/team-mode"
24
import { getTeamMemberFifoPath } from "../../features/team-mode/team-layout-tmux/fifo-path"
35
import * as teamStateStore from "../../features/team-mode/team-state-store"
@@ -6,106 +8,80 @@ import { writeTeamSessionFifo } from "./fifo-writer"
68

79
type TeamStateStore = Pick<typeof teamStateStore, "listActiveTeams" | "loadRuntimeState">
810

11+
type MessagePartDeltaEvent = {
12+
type: "message.part.delta"
13+
properties: {
14+
sessionID: string
15+
partID?: string
16+
field?: string
17+
delta: string
18+
}
19+
}
20+
21+
type StreamEvent = Event | MessagePartDeltaEvent
22+
923
type TeamSessionStreamTarget = {
1024
teamRunId: string
1125
memberName: string
1226
fifoPath: string
1327
}
1428

15-
type TeamSessionStreamEvent = {
16-
type: string
17-
properties?: unknown
18-
}
19-
20-
type HookInput = {
21-
event?: unknown
22-
}
23-
24-
type HookImpl = {
25-
event: (input: HookInput) => Promise<void>
26-
}
29+
type HookInput = { event: StreamEvent }
30+
type HookImpl = { event: (input: HookInput) => Promise<void> }
2731

2832
const DROPPABLE_FIFO_ERROR_CODES = new Set(["ENXIO", "ENOENT", "EPIPE"])
2933

30-
function isRecord(value: unknown): value is Record<string, unknown> {
31-
return typeof value === "object" && value !== null
32-
}
33-
3434
function isErrorWithCode(error: unknown): error is Error & { code: string } {
3535
return error instanceof Error && "code" in error && typeof error.code === "string"
3636
}
3737

38-
function normalizeEvent(input: HookInput): TeamSessionStreamEvent | undefined {
39-
if (!isRecord(input.event) || typeof input.event.type !== "string") return undefined
40-
return {
41-
type: input.event.type,
42-
properties: input.event.properties,
43-
}
38+
function extractCumulativeText(part: Part): string | undefined {
39+
if ("text" in part && typeof part.text === "string") return part.text
40+
return undefined
4441
}
4542

46-
function getDeletedSessionID(properties: unknown): string | undefined {
47-
if (!isRecord(properties) || !isRecord(properties.info)) return undefined
48-
return typeof properties.info.id === "string" ? properties.info.id : undefined
49-
}
43+
function extractUpdateSegment(
44+
event: EventMessagePartUpdated,
45+
partTextByKey: Map<string, string>,
46+
): { sessionID: string; text: string } | undefined {
47+
const part = event.properties.part
48+
const delta = event.properties.delta
49+
const partKey = `${part.sessionID}:${part.id}`
5050

51-
function getRemovedPartRef(properties: unknown): { sessionID: string; partID: string } | undefined {
52-
if (!isRecord(properties)) return undefined
53-
return typeof properties.sessionID === "string" && typeof properties.partID === "string"
54-
? { sessionID: properties.sessionID, partID: properties.partID }
55-
: undefined
56-
}
51+
if (typeof delta === "string" && delta.length > 0) {
52+
const previousText = partTextByKey.get(partKey) ?? ""
53+
partTextByKey.set(partKey, previousText + delta)
54+
return { sessionID: part.sessionID, text: delta }
55+
}
5756

58-
function getErroredSessionID(properties: unknown): string | undefined {
59-
return isRecord(properties) && typeof properties.sessionID === "string" ? properties.sessionID : undefined
60-
}
57+
const cumulativeText = extractCumulativeText(part)
58+
if (cumulativeText === undefined) return undefined
6159

62-
function getTextPart(properties: unknown): { sessionID: string; partID: string; text: string } | undefined {
63-
if (!isRecord(properties) || !isRecord(properties.part)) {
64-
return undefined
65-
}
60+
const previousText = partTextByKey.get(partKey) ?? ""
61+
partTextByKey.set(partKey, cumulativeText)
6662

67-
const part = properties.part
68-
const sessionID = typeof properties.sessionID === "string"
69-
? properties.sessionID
70-
: typeof part.sessionID === "string"
71-
? part.sessionID
72-
: undefined
73-
74-
if (!sessionID) return undefined
75-
if (typeof part.id !== "string") return undefined
76-
if (typeof part.text === "string") {
77-
return { sessionID, partID: part.id, text: part.text }
78-
}
79-
if (typeof part.content === "string") {
80-
return { sessionID, partID: part.id, text: part.content }
81-
}
63+
const appendedText = cumulativeText.startsWith(previousText)
64+
? cumulativeText.slice(previousText.length)
65+
: cumulativeText
66+
if (appendedText.length === 0) return undefined
67+
return { sessionID: part.sessionID, text: appendedText }
8268
}
8369

84-
function getUpdatedSegment(
85-
properties: unknown,
70+
function extractDeltaSegment(
71+
event: MessagePartDeltaEvent,
8672
partTextByKey: Map<string, string>,
8773
): { sessionID: string; text: string } | undefined {
88-
if (!isRecord(properties)) return undefined
89-
90-
const delta = typeof properties.delta === "string" ? properties.delta : undefined
91-
const part = getTextPart(properties)
92-
if (!part) return undefined
93-
const partKey = `${part.sessionID}:${part.partID}`
74+
const { sessionID, partID, field, delta } = event.properties
75+
if (typeof delta !== "string" || delta.length === 0) return undefined
76+
if (field !== undefined && field !== "text" && field !== "content") return undefined
9477

95-
if (delta && delta.length > 0) {
78+
if (partID) {
79+
const partKey = `${sessionID}:${partID}`
9680
const previousText = partTextByKey.get(partKey) ?? ""
9781
partTextByKey.set(partKey, previousText + delta)
98-
return { sessionID: part.sessionID, text: delta }
9982
}
10083

101-
const previousText = partTextByKey.get(partKey) ?? ""
102-
partTextByKey.set(partKey, part.text)
103-
const appendedText = part.text.startsWith(previousText)
104-
? part.text.slice(previousText.length)
105-
: part.text
106-
107-
if (appendedText.length === 0) return undefined
108-
return { sessionID: part.sessionID, text: appendedText }
84+
return { sessionID, text: delta }
10985
}
11086

11187
function clearSessionPartState(sessionID: string, partTextByKey: Map<string, string>): void {
@@ -151,61 +127,71 @@ export function createTeamSessionStreamer(config: TeamModeConfig, stateStore: Te
151127
return undefined
152128
}
153129

130+
async function writeSegment(
131+
sessionID: string,
132+
text: string,
133+
): Promise<void> {
134+
const target = await resolveStreamTarget(sessionID)
135+
if (!target) return
136+
137+
try {
138+
await writeTeamSessionFifo(target.fifoPath, text)
139+
} catch (error) {
140+
if (isErrorWithCode(error) && DROPPABLE_FIFO_ERROR_CODES.has(error.code)) {
141+
if (error.code === "ENOENT") {
142+
streamTargetsBySession.delete(sessionID)
143+
}
144+
return
145+
}
146+
147+
log("team session streamer write failed", {
148+
event: "team-mode-session-streamer-write-error",
149+
teamRunId: target.teamRunId,
150+
memberName: target.memberName,
151+
sessionID,
152+
fifoPath: target.fifoPath,
153+
error: error instanceof Error ? error.message : String(error),
154+
})
155+
}
156+
}
157+
154158
return {
155-
event: async (input: HookInput): Promise<void> => {
156-
const event = normalizeEvent(input)
157-
if (!event) return
159+
event: async ({ event }: HookInput): Promise<void> => {
158160
if (!config.enabled || !config.tmux_visualization) return
159161

160162
if (event.type === "session.deleted") {
161-
const sessionID = getDeletedSessionID(event.properties)
162-
if (!sessionID) return
163+
const sessionID = event.properties.info.id
163164
streamTargetsBySession.delete(sessionID)
164165
clearSessionPartState(sessionID, partTextByKey)
165166
return
166167
}
167168

168169
if (event.type === "message.part.removed") {
169-
const removedPart = getRemovedPartRef(event.properties)
170-
if (!removedPart) return
171-
partTextByKey.delete(`${removedPart.sessionID}:${removedPart.partID}`)
170+
partTextByKey.delete(`${event.properties.sessionID}:${event.properties.partID}`)
172171
return
173172
}
174173

175174
if (event.type === "session.error") {
176-
const sessionID = getErroredSessionID(event.properties)
177-
if (!sessionID) return
178-
streamTargetsBySession.delete(sessionID)
179-
clearSessionPartState(sessionID, partTextByKey)
175+
const sessionID = event.properties.sessionID
176+
if (sessionID) {
177+
streamTargetsBySession.delete(sessionID)
178+
clearSessionPartState(sessionID, partTextByKey)
179+
}
180+
return
181+
}
182+
183+
if (event.type === "message.part.delta") {
184+
const segment = extractDeltaSegment(event, partTextByKey)
185+
if (!segment) return
186+
await writeSegment(segment.sessionID, segment.text)
180187
return
181188
}
182189

183190
if (event.type !== "message.part.updated") return
184-
const segment = getUpdatedSegment(event.properties, partTextByKey)
191+
const segment = extractUpdateSegment(event, partTextByKey)
185192
if (!segment) return
186193

187-
const target = await resolveStreamTarget(segment.sessionID)
188-
if (!target) return
189-
190-
try {
191-
await writeTeamSessionFifo(target.fifoPath, segment.text)
192-
} catch (error) {
193-
if (isErrorWithCode(error) && DROPPABLE_FIFO_ERROR_CODES.has(error.code)) {
194-
if (error.code === "ENOENT") {
195-
streamTargetsBySession.delete(segment.sessionID)
196-
}
197-
return
198-
}
199-
200-
log("team session streamer write failed", {
201-
event: "team-mode-session-streamer-write-error",
202-
teamRunId: target.teamRunId,
203-
memberName: target.memberName,
204-
sessionID: segment.sessionID,
205-
fifoPath: target.fifoPath,
206-
error: error instanceof Error ? error.message : String(error),
207-
})
208-
}
194+
await writeSegment(segment.sessionID, segment.text)
209195
},
210196
}
211197
}

0 commit comments

Comments
 (0)