-
Notifications
You must be signed in to change notification settings - Fork 237
Expand file tree
/
Copy pathprivate.ts
More file actions
167 lines (148 loc) · 5.07 KB
/
private.ts
File metadata and controls
167 lines (148 loc) · 5.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
/**
* Utils used by the bundler when transforming code
*/
import type { EncryptionKeyLike } from './encryption.js';
import type { EventsConsumer } from './events-consumer.js';
import type { QueueItem } from './global.js';
import type { Serializable } from './schemas.js';
export type StepFunction<
Args extends Serializable[] = any[],
Result extends Serializable | unknown = unknown,
> = ((...args: Args) => Promise<Result>) & {
maxRetries?: number;
stepId?: string;
};
const registeredSteps = new Map<string, StepFunction>();
const BUILTIN_RESPONSE_STEP_NAMES = new Set([
'__builtin_response_array_buffer',
'__builtin_response_json',
'__builtin_response_text',
]);
function getStepIdAliasCandidates(stepId: string): string[] {
const parts = stepId.split('//');
if (parts.length !== 3 || parts[0] !== 'step') {
return [];
}
const modulePath = parts[1];
const fnName = parts[2];
const modulePathAliases = new Set<string>();
const addAlias = (aliasModulePath: string) => {
if (aliasModulePath !== modulePath) {
modulePathAliases.add(aliasModulePath);
}
};
if (modulePath.startsWith('./workflows/')) {
const workflowRelativePath = modulePath.slice('./'.length);
addAlias(`./example/${workflowRelativePath}`);
addAlias(`./src/${workflowRelativePath}`);
} else if (modulePath.startsWith('./example/workflows/')) {
const workflowRelativePath = modulePath.slice('./example/'.length);
addAlias(`./${workflowRelativePath}`);
addAlias(`./src/${workflowRelativePath}`);
} else if (modulePath.startsWith('./src/workflows/')) {
const workflowRelativePath = modulePath.slice('./src/'.length);
addAlias(`./${workflowRelativePath}`);
addAlias(`./example/${workflowRelativePath}`);
}
return Array.from(
modulePathAliases,
(aliasModulePath) => `step//${aliasModulePath}//${fnName}`
);
}
function getBuiltinResponseStepAlias(stepId: string): StepFunction | undefined {
if (!BUILTIN_RESPONSE_STEP_NAMES.has(stepId)) {
return undefined;
}
for (const [registeredStepId, stepFn] of registeredSteps.entries()) {
if (registeredStepId.endsWith(`//${stepId}`)) {
return stepFn;
}
}
return undefined;
}
/**
* Register a step function to be served in the server bundle.
* Also sets the stepId property on the function for serialization support.
*/
export function registerStepFunction(stepId: string, stepFn: StepFunction) {
registeredSteps.set(stepId, stepFn);
stepFn.stepId = stepId;
}
/**
* Find a registered step function by name
*/
export function getStepFunction(stepId: string): StepFunction | undefined {
const directMatch = registeredSteps.get(stepId);
if (directMatch) {
return directMatch;
}
// Support equivalent workflow path aliases in mixed symlink environments.
for (const aliasStepId of getStepIdAliasCandidates(stepId)) {
const aliasMatch = registeredSteps.get(aliasStepId);
if (aliasMatch) {
return aliasMatch;
}
}
const builtinAliasMatch = getBuiltinResponseStepAlias(stepId);
if (builtinAliasMatch) {
return builtinAliasMatch;
}
return undefined;
}
/**
* Get closure variables for the current step function
* @internal
*/
export { __private_getClosureVars } from './step/get-closure-vars.js';
export interface WorkflowOrchestratorContext {
runId: string;
encryptionKey: EncryptionKeyLike | undefined;
globalThis: typeof globalThis;
eventsConsumer: EventsConsumer;
/**
* Map of pending invocations keyed by correlationId.
* Using Map instead of Array for O(1) lookup/delete operations.
*/
invocationsQueue: Map<string, QueueItem>;
onWorkflowError: (error: Error) => void;
generateUlid: () => string;
generateNanoid: () => string;
/**
* Sequential promise queue that ensures all event-driven promise resolutions
* (step results, hook payloads, failures, suspensions) happen in event log
* order. Every resolve, reject, or workflow error is chained through this
* queue so that even if individual operations take variable time (e.g.,
* async decryption), promises resolve deterministically.
*/
promiseQueue: Promise<void>;
/**
* Counter of in-flight async data delivery operations (step result
* hydration, hook payload hydration). Suspensions must wait for this
* to reach 0 before firing, to avoid preempting data delivery.
*/
pendingDeliveries: number;
}
/**
* Schedule a callback to fire only after all pending data deliveries
* (step results, hook payloads) and async deserialization have completed.
* Uses a polling loop: setTimeout(0) → check pendingDeliveries →
* if > 0, wait for promiseQueue → repeat. This handles the multi-round
* delivery pattern where each hook payload delivery cycle appends new
* async work to the promiseQueue.
*/
export function scheduleWhenIdle(
ctx: WorkflowOrchestratorContext,
fn: () => void
): void {
const check = () => {
if (ctx.pendingDeliveries > 0) {
// Still delivering data — wait for queue to drain, then re-check
ctx.promiseQueue.then(() => {
setTimeout(check, 0);
});
} else {
fn();
}
};
setTimeout(check, 0);
}