-
Notifications
You must be signed in to change notification settings - Fork 237
Expand file tree
/
Copy pathresume-hook.ts
More file actions
312 lines (291 loc) · 9.92 KB
/
resume-hook.ts
File metadata and controls
312 lines (291 loc) · 9.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
import { waitUntil } from '@vercel/functions';
import {
ERROR_SLUGS,
HookNotFoundError,
WorkflowRuntimeError,
} from '@workflow/errors';
import {
type Hook,
isLegacySpecVersion,
SPEC_VERSION_CURRENT,
SPEC_VERSION_LEGACY,
type WorkflowInvokePayload,
type WorkflowRun,
} from '@workflow/world';
import { getRunCapabilities } from '../capabilities.js';
import { type CryptoKey, importKey } from '../encryption.js';
import {
dehydrateStepReturnValue,
hydrateStepArguments,
SerializationFormat,
} from '../serialization.js';
import { WEBHOOK_RESPONSE_WRITABLE } from '../symbols.js';
import * as Attribute from '../telemetry/semantic-conventions.js';
import { getSpanContextForTraceCarrier, trace } from '../telemetry.js';
import { waitedUntil } from '../util.js';
import { getWorkflowQueueName } from './helpers.js';
import { getWorld } from './world.js';
/**
* Internal helper that returns the hook, the associated workflow run,
* and the resolved encryption key.
*/
async function getHookByTokenWithKey(token: string): Promise<{
hook: Hook;
run: WorkflowRun;
encryptionKey: CryptoKey | undefined;
}> {
const world = getWorld();
const hook = await world.hooks.getByToken(token);
const run = await world.runs.get(hook.runId);
const rawKey = await world.getEncryptionKeyForRun?.(run);
const encryptionKey = rawKey ? await importKey(rawKey) : undefined;
if (typeof hook.metadata !== 'undefined') {
hook.metadata = await hydrateStepArguments(
hook.metadata as any,
hook.runId,
encryptionKey
);
}
return { hook, run, encryptionKey };
}
/**
* Get the hook by token to find the associated workflow run,
* and hydrate the `metadata` property if it was set from within
* the workflow run.
*
* @param token - The unique token identifying the hook
*/
export async function getHookByToken(token: string): Promise<Hook> {
const { hook } = await getHookByTokenWithKey(token);
return hook;
}
/**
* Resumes a workflow run by sending a payload to a hook identified by its token.
*
* This function is called externally (e.g., from an API route or server action)
* to send data to a hook and resume the associated workflow run.
*
* @param tokenOrHook - The unique token identifying the hook, or the hook object itself
* @param payload - The data payload to send to the hook
* @returns Promise resolving to the hook
* @throws Error if the hook is not found or if there's an error during the process
*
* @example
*
* ```ts
* // In an API route
* import { resumeHook } from '@workflow/core/runtime';
*
* export async function POST(request: Request) {
* const { token, data } = await request.json();
*
* try {
* const hook = await resumeHook(token, data);
* return Response.json({ runId: hook.runId });
* } catch (error) {
* return new Response('Hook not found', { status: 404 });
* }
* }
* ```
*/
export async function resumeHook<T = any>(
tokenOrHook: string | Hook,
payload: T,
encryptionKeyOverride?: CryptoKey
): Promise<Hook> {
return await waitedUntil(() => {
return trace('hook.resume', async (span) => {
const world = getWorld();
try {
let hook: Hook;
let workflowRun: WorkflowRun;
let encryptionKey: CryptoKey | undefined;
if (typeof tokenOrHook === 'string') {
const result = await getHookByTokenWithKey(tokenOrHook);
hook = result.hook;
workflowRun = result.run;
encryptionKey = encryptionKeyOverride ?? result.encryptionKey;
} else {
hook = tokenOrHook;
workflowRun = await world.runs.get(hook.runId);
if (encryptionKeyOverride) {
encryptionKey = encryptionKeyOverride;
} else {
const rawKey = await world.getEncryptionKeyForRun?.(workflowRun);
encryptionKey = rawKey ? await importKey(rawKey) : undefined;
}
}
span?.setAttributes({
...Attribute.HookToken(hook.token),
...Attribute.HookId(hook.hookId),
...Attribute.WorkflowRunId(hook.runId),
});
// Check the target run's capabilities to ensure we encode the
// payload in a format the run's deployment can decode. For example,
// runs created before encryption support was added cannot decode
// the 'encr' serialization format.
const rawVersion = workflowRun.executionContext?.workflowCoreVersion;
const { supportedFormats } = getRunCapabilities(
typeof rawVersion === 'string' ? rawVersion : undefined
);
if (!supportedFormats.has(SerializationFormat.ENCRYPTED)) {
encryptionKey = undefined;
}
// Dehydrate the payload for storage
const ops: Promise<any>[] = [];
const v1Compat = isLegacySpecVersion(hook.specVersion);
const dehydratedPayload = await dehydrateStepReturnValue(
payload,
hook.runId,
encryptionKey,
ops,
globalThis,
v1Compat
);
// NOTE: Workaround instead of injecting catching undefined unhandled rejections in webhook bundle
waitUntil(
Promise.all(ops).catch((err) => {
if (err !== undefined) throw err;
})
);
// Create a hook_received event with the payload
await world.events.create(
hook.runId,
{
eventType: 'hook_received',
specVersion: SPEC_VERSION_CURRENT,
correlationId: hook.hookId,
eventData: {
payload: dehydratedPayload,
},
},
{ v1Compat }
);
span?.setAttributes({
...Attribute.WorkflowName(workflowRun.workflowName),
});
const traceCarrier = workflowRun.executionContext?.traceCarrier;
if (traceCarrier) {
const context = await getSpanContextForTraceCarrier(traceCarrier);
if (context) {
span?.addLink?.({ context });
}
}
// Re-trigger the workflow against the deployment ID associated
// with the workflow run that the hook belongs to
await world.queue(
getWorkflowQueueName(workflowRun.workflowName),
{
runId: hook.runId,
// attach the trace carrier from the workflow run
traceCarrier:
workflowRun.executionContext?.traceCarrier ?? undefined,
} satisfies WorkflowInvokePayload,
{
deploymentId: workflowRun.deploymentId,
specVersion: workflowRun.specVersion ?? SPEC_VERSION_LEGACY,
}
);
return hook;
} catch (err) {
span?.setAttributes({
...Attribute.HookToken(
typeof tokenOrHook === 'string' ? tokenOrHook : tokenOrHook.token
),
...Attribute.HookFound(false),
});
throw err;
}
});
});
}
/**
* Resumes a webhook by sending a {@link https://developer.mozilla.org/en-US/docs/Web/API/Request | Request}
* object to a hook identified by its token.
*
* This function is called externally (e.g., from an API route or server action)
* to send a request to a webhook and resume the associated workflow run.
*
* @param token - The unique token identifying the hook
* @param request - The request to send to the hook
* @returns Promise resolving to the response
* @throws Error if the hook is not found or if there's an error during the process
*
* @example
*
* ```ts
* // In an API route
* import { resumeWebhook } from '@workflow/core/runtime';
*
* export async function POST(request: Request) {
* const url = new URL(request.url);
* const token = url.searchParams.get('token');
*
* if (!token) {
* return new Response('Missing token', { status: 400 });
* }
*
* try {
* const response = await resumeWebhook(token, request);
* return response;
* } catch (error) {
* return new Response('Webhook not found', { status: 404 });
* }
* }
* ```
*/
export async function resumeWebhook(
token: string,
request: Request
): Promise<Response> {
const { hook, encryptionKey } = await getHookByTokenWithKey(token);
// Only webhooks can be resumed via the public endpoint.
// If the hook was created via createHook() (isWebhook !== true),
// throw the same "not found" error the world would throw for a missing
// token. This prevents leaking that the token is valid.
if (hook.isWebhook === false) {
throw new HookNotFoundError(token);
}
let response: Response | undefined;
let responseReadable: ReadableStream<Response> | undefined;
if (
hook.metadata &&
typeof hook.metadata === 'object' &&
'respondWith' in hook.metadata
) {
if (hook.metadata.respondWith === 'manual') {
const { readable, writable } = new TransformStream<Response, Response>();
responseReadable = readable;
// The request instance includes the writable stream which will be used
// to write the response to the client from within the workflow run
(request as any)[WEBHOOK_RESPONSE_WRITABLE] = writable;
} else if (hook.metadata.respondWith instanceof Response) {
response = hook.metadata.respondWith;
} else {
throw new WorkflowRuntimeError(
`Invalid \`respondWith\` value: ${hook.metadata.respondWith}`,
{ slug: ERROR_SLUGS.WEBHOOK_INVALID_RESPOND_WITH_VALUE }
);
}
} else {
// No `respondWith` value implies the default behavior of returning a 202
response = new Response(null, { status: 202 });
}
await resumeHook(hook, request, encryptionKey);
if (responseReadable) {
// Wait for the readable stream to emit one chunk,
// which is the `Response` object
const reader = responseReadable.getReader();
const chunk = await reader.read();
if (chunk.value) {
response = chunk.value;
}
reader.cancel();
}
if (!response) {
throw new WorkflowRuntimeError('Workflow run did not send a response', {
slug: ERROR_SLUGS.WEBHOOK_RESPONSE_NOT_SENT,
});
}
return response;
}