-
Notifications
You must be signed in to change notification settings - Fork 236
Expand file tree
/
Copy pathqueue.ts
More file actions
120 lines (106 loc) · 3.81 KB
/
queue.ts
File metadata and controls
120 lines (106 loc) · 3.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import { z } from 'zod/v4';
export const QueuePrefix = z.union([
z.literal('__wkf_step_'),
z.literal('__wkf_workflow_'),
]);
export type QueuePrefix = z.infer<typeof QueuePrefix>;
export const ValidQueueName = z.templateLiteral([QueuePrefix, z.string()]);
export type ValidQueueName = z.infer<typeof ValidQueueName>;
export const MessageId = z
.string()
.brand<'MessageId'>()
.describe('A stored queue message ID');
export type MessageId = z.infer<typeof MessageId>;
/**
* OpenTelemetry trace context for distributed tracing
*/
export const TraceCarrierSchema = z.record(z.string(), z.string());
export type TraceCarrier = z.infer<typeof TraceCarrierSchema>;
/**
* Run creation data carried through the queue for resilient start.
* Only present on the first queue delivery — re-enqueues omit this.
* When the runtime processes the message, it passes this data to the
* run_started event so the server can create the run if it doesn't exist yet.
*/
export const RunInputSchema = z.object({
input: z.unknown(),
deploymentId: z.string(),
workflowName: z.string(),
specVersion: z.number(),
executionContext: z.record(z.string(), z.any()).optional(),
});
export type RunInput = z.infer<typeof RunInputSchema>;
export const WorkflowInvokePayloadSchema = z.object({
runId: z.string(),
traceCarrier: TraceCarrierSchema.optional(),
requestedAt: z.coerce.date().optional(),
/** Number of times this message has been re-enqueued due to server errors (5xx) */
serverErrorRetryCount: z.number().int().optional(),
/** Run creation data, only present on the first queue delivery from start() */
runInput: RunInputSchema.optional(),
});
export const StepInvokePayloadSchema = z.object({
workflowName: z.string(),
workflowRunId: z.string(),
workflowStartedAt: z.number(),
stepId: z.string(),
traceCarrier: TraceCarrierSchema.optional(),
requestedAt: z.coerce.date().optional(),
});
export type WorkflowInvokePayload = z.infer<typeof WorkflowInvokePayloadSchema>;
export type StepInvokePayload = z.infer<typeof StepInvokePayloadSchema>;
export type HealthCheckPayload = z.infer<typeof HealthCheckPayloadSchema>;
/**
* Health check payload - used to verify that the queue pipeline
* can deliver messages to workflow/step endpoints.
*/
export const HealthCheckPayloadSchema = z.object({
__healthCheck: z.literal(true),
correlationId: z.string(),
});
export const QueuePayloadSchema = z.union([
WorkflowInvokePayloadSchema,
StepInvokePayloadSchema,
HealthCheckPayloadSchema,
]);
export type QueuePayload = z.infer<typeof QueuePayloadSchema>;
export interface QueueOptions {
deploymentId?: string;
idempotencyKey?: string;
headers?: Record<string, string>;
/** Delay message delivery by this many seconds */
delaySeconds?: number;
/** Spec version of the target run. Used to select the queue transport format. */
specVersion?: number;
}
export interface Queue {
getDeploymentId(): Promise<string>;
/**
* Enqueues a message to the specified queue.
*
* @param queueName - The name of the queue to which the message will be sent.
* @param message - The content of the message to be sent to the queue.
* @param opts - Optional parameters for the queue operation.
*/
queue(
queueName: ValidQueueName,
message: QueuePayload,
opts?: QueueOptions
): Promise<{ messageId: MessageId | null }>;
/**
* Creates an HTTP queue handler for processing messages from a specific queue.
*/
createQueueHandler(
queueNamePrefix: QueuePrefix,
handler: (
message: unknown,
meta: {
attempt: number;
queueName: ValidQueueName;
messageId: MessageId;
requestId?: string;
}
// biome-ignore lint/suspicious/noConfusingVoidType: it is what it is
) => Promise<void | { timeoutSeconds: number }>
): (req: Request) => Promise<Response>;
}