Skip to content

Commit 9d6fa0c

Browse files
authored
refactor(cf): Queue-based CFEvents + property tests (#135)
* test(cf): add property-based tests for CF detection schemas 12 fast-check property tests using Schema.toArbitrary for CloudflareInfo, CloudflareResult, and CloudflareSnapshot. Validates round-trip encode/decode, type constraints (finite, integer), required fields, and JSON serialization. * refactor(cf): replace createCFEvents closure with Queue-based event pipeline Replace the createCFEvents frozen closure in cloudflare-event-emitter.ts with a Queue.unbounded<CFEvent>() pipeline. The old closure mixed three concerns (tracker mutation, CDP event emission, replay marker injection) and used a _setRealEmit closure hack for late-binding. New architecture: - CFEvent TaggedEnum (6 variants) in cf-event-types.ts - makeCFEventPipeline() in cf-event-queue.ts with Stream consumer - Queue.offerUnsafe for synchronous publishing from any context - Match.exhaustive consumer with compile-time variant coverage Call site changes: 34 sites across 3 files (detector: 20, state-tracker: 11, solver crash handler: 1, solver layer: 5). SolverEvents consumers (45 call sites across 4 files) required ZERO changes — interface unchanged. Deleted: createCFEvents function, CFEvents type, _setRealEmit hack.
1 parent 679dbac commit 9d6fa0c

10 files changed

+531
-207
lines changed

package-lock.json

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@
199199
"@commitlint/config-conventional": "^20.4.4",
200200
"@effect/vitest": "4.0.0-beta.31",
201201
"env-cmd": "^11.0.0",
202+
"fast-check": "^4.6.0",
202203
"lefthook": "^1.11.13",
203204
"vite": "^8.0.0",
204205
"vitest": "^4.1.0"

src/session/cf/cf-event-queue.ts

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/**
2+
* CF Event Queue — Queue.unbounded pipeline for CF event processing.
3+
*
4+
* Replaces createCFEvents frozen closure. Single FIFO consumer handles
5+
* tracker mutation, CDP event emission, and replay marker injection.
6+
*
7+
* Queue.offerUnsafe is synchronous — safe from Resolution callbacks,
8+
* scope finalizers, and fiber crash handlers.
9+
*/
10+
import { Effect, Latch, Match, Queue, Stream } from 'effect';
11+
12+
import { runForkInServer } from '../../otel-runtime.js';
13+
import { CdpSessionId } from '../../shared/cloudflare-detection.js';
14+
import type { TargetId, CloudflareResult } from '../../shared/cloudflare-detection.js';
15+
import { CloudflareTracker } from './cloudflare-event-emitter.js';
16+
import type { ActiveDetection, EmitClientEvent, InjectMarker } from './cloudflare-event-emitter.js';
17+
import { Resolution } from './cf-resolution.js';
18+
import type { CFEvent } from './cf-event-types.js';
19+
20+
export interface CFEventPipelineDeps {
21+
readonly injectMarker: InjectMarker;
22+
readonly emitClientEvent: () => EmitClientEvent;
23+
readonly sessionId: string;
24+
readonly shouldRecordMarkers: () => boolean;
25+
}
26+
27+
export interface CFEventPipeline {
28+
readonly queue: Queue.Queue<CFEvent>;
29+
/** Fork this as a detached fiber — it drains the queue until shutdown. */
30+
readonly consumer: Effect.Effect<void>;
31+
}
32+
33+
/**
34+
* Create a CF event pipeline with a Queue and Stream-based consumer.
35+
*
36+
* The consumer reproduces exact behavior of the old createCFEvents methods:
37+
* - Progress: tracker.onProgress + emitClientEvent + marker
38+
* - Solved: tracker.snapshot + log + emitClientEvent + marker
39+
* - Failed: tracker.snapshot + log + emitClientEvent + marker
40+
* - Detected: emitClientEvent
41+
* - Marker: injectMarker (if shouldRecordMarkers)
42+
* - StandaloneAutoSolved: construct synthetic active + Detected + Solved
43+
*/
44+
export function makeCFEventPipeline(deps: CFEventPipelineDeps): CFEventPipeline {
45+
const queue = Effect.runSync(Queue.unbounded<CFEvent>());
46+
47+
const marker = (targetId: TargetId, tag: string, payload?: object): void => {
48+
if (deps.shouldRecordMarkers()) {
49+
deps.injectMarker(targetId, tag, payload);
50+
}
51+
};
52+
53+
const handleEvent = (event: CFEvent): Effect.Effect<void> =>
54+
Effect.sync(() => {
55+
Match.value(event).pipe(
56+
Match.tag('Detected', ({ active }) => {
57+
deps.emitClientEvent()('Browserless.cloudflareDetected', {
58+
type: active.info.type,
59+
url: active.info.url,
60+
iframeUrl: active.info.iframeUrl,
61+
cRay: active.info.cRay,
62+
detectionMethod: active.info.detectionMethod,
63+
pollCount: active.info.pollCount || 1,
64+
targetId: active.pageTargetId,
65+
}).catch((e) => runForkInServer(Effect.logDebug(`emitDetected failed: ${e instanceof Error ? e.message : String(e)}`)));
66+
}),
67+
68+
Match.tag('Progress', ({ active, state, extra }) => {
69+
active.tracker.onProgress(state, extra);
70+
deps.emitClientEvent()('Browserless.cloudflareProgress', {
71+
state,
72+
elapsed_ms: Date.now() - active.startTime,
73+
attempt: active.attempt,
74+
targetId: active.pageTargetId,
75+
...extra,
76+
}).catch((e) => runForkInServer(Effect.logDebug(`emitProgress failed: ${e instanceof Error ? e.message : String(e)}`)));
77+
marker(active.pageTargetId, 'cf.state_change', { state, ...extra });
78+
}),
79+
80+
Match.tag('Solved', ({ active, result, cf_summary_label, skipMarker }) => {
81+
const snap = active.tracker.snapshot();
82+
const timingStr = snap.checkbox_to_click_ms != null
83+
? ` checkbox_to_click_ms=${snap.checkbox_to_click_ms} phase4_ms=${snap.phase4_duration_ms}`
84+
: '';
85+
runForkInServer(Effect.logInfo(`CF solved: session=${deps.sessionId.slice(0, 8)} type=${result.type} method=${result.method} duration=${result.duration_ms}ms${timingStr}`));
86+
deps.emitClientEvent()('Browserless.cloudflareSolved', {
87+
...result,
88+
token_length: result.token_length ?? result.token?.length ?? 0,
89+
targetId: active.pageTargetId,
90+
summary: active.tracker.snapshot(),
91+
cf_summary_label: cf_summary_label ?? '',
92+
}).catch((e) => runForkInServer(Effect.logDebug(`emitSolved failed: ${e instanceof Error ? e.message : String(e)}`)));
93+
if (!skipMarker) {
94+
marker(active.pageTargetId, 'cf.solved', {
95+
type: result.type, method: result.method, duration_ms: result.duration_ms,
96+
phase_label: result.phase_label, signal: result.signal,
97+
});
98+
}
99+
}),
100+
101+
Match.tag('Failed', ({ active, reason, duration, phaseLabel, cf_summary_label, skipMarker, cf_verified }) => {
102+
const phase_label = phaseLabel ?? `✗ ${reason}`;
103+
const cfVerified = cf_verified ?? false;
104+
const snap = active.tracker.snapshot();
105+
const isRechallenge = (active.rechallengeCount ?? 0) > 0;
106+
const diag = snap.widget_diag;
107+
const diagStr = diag ? ` diag_alive=${diag.alive} diag_cbI=${diag.cbI} diag_inp=${diag.inp} diag_shadow=${diag.shadow} diag_bodyLen=${diag.bodyLen}` : '' ;
108+
const timingStr = snap.checkbox_to_click_ms != null
109+
? ` checkbox_to_click_ms=${snap.checkbox_to_click_ms} phase4_ms=${snap.phase4_duration_ms}`
110+
: '';
111+
runForkInServer(Effect.logWarning(`CF failed: session=${deps.sessionId.slice(0, 8)} reason=${reason} type=${active.info.type} method=${active.info.detectionMethod} target=${active.pageTargetId.slice(0, 8)} duration=${duration}ms attempts=${active.attempt} oopif_url=${active.info.url || 'none'} rechallenge=${isRechallenge} cf_verified=${cfVerified} widget_error_count=${snap.widget_error_count} widget_error_type=${snap.widget_error_type ?? 'none'} click_count=${snap.click_count} false_positives=${snap.false_positive_count}${diagStr}${timingStr}`));
112+
deps.emitClientEvent()('Browserless.cloudflareFailed', {
113+
reason, type: active.info.type, duration_ms: duration, attempts: active.attempt,
114+
targetId: active.pageTargetId,
115+
oopif_url: active.info.url,
116+
summary: snap,
117+
phase_label,
118+
cf_summary_label,
119+
cf_verified: cfVerified,
120+
}).catch((e) => runForkInServer(Effect.logDebug(`emitFailed failed: ${e instanceof Error ? e.message : String(e)}`)));
121+
if (!skipMarker) {
122+
marker(active.pageTargetId, 'cf.failed', { reason, duration_ms: duration, phase_label, oopif_url: active.info.url, rechallenge: isRechallenge, cf_verified: cfVerified });
123+
}
124+
}),
125+
126+
Match.tag('Marker', ({ targetId, tag, payload }) => {
127+
marker(targetId, tag, payload);
128+
}),
129+
130+
Match.tag('StandaloneAutoSolved', ({ targetId, signal, tokenLength, cdpSessionId }) => {
131+
const info = {
132+
type: 'turnstile' as const, url: '', detectionMethod: signal,
133+
};
134+
const abortLatch = Latch.makeUnsafe(false);
135+
abortLatch.openUnsafe();
136+
const active: ActiveDetection = {
137+
info, pageCdpSessionId: cdpSessionId || CdpSessionId.makeUnsafe(''), pageTargetId: targetId,
138+
startTime: Date.now(), attempt: 0, aborted: true,
139+
tracker: new CloudflareTracker(info),
140+
abortLatch,
141+
resolution: Resolution.makeUnsafe(),
142+
};
143+
144+
// Emit detected
145+
deps.emitClientEvent()('Browserless.cloudflareDetected', {
146+
type: active.info.type, url: active.info.url, iframeUrl: active.info.iframeUrl,
147+
cRay: active.info.cRay, detectionMethod: active.info.detectionMethod,
148+
pollCount: active.info.pollCount || 1, targetId: active.pageTargetId,
149+
}).catch((e) => runForkInServer(Effect.logDebug(`emitDetected failed: ${e instanceof Error ? e.message : String(e)}`)));
150+
if (targetId) {
151+
marker(targetId, 'cf.detected', { type: 'turnstile' });
152+
}
153+
154+
// Emit solved
155+
const result: CloudflareResult = {
156+
solved: true, type: 'turnstile', method: 'auto_solve',
157+
duration_ms: 0, attempts: 0, auto_resolved: true,
158+
signal, token_length: tokenLength, phase_label: '→',
159+
};
160+
runForkInServer(Effect.logInfo(`CF solved: session=${deps.sessionId.slice(0, 8)} type=turnstile method=auto_solve duration=0ms`));
161+
deps.emitClientEvent()('Browserless.cloudflareSolved', {
162+
...result,
163+
token_length: tokenLength,
164+
targetId: active.pageTargetId,
165+
summary: active.tracker.snapshot(),
166+
cf_summary_label: 'Emb→',
167+
}).catch((e) => runForkInServer(Effect.logDebug(`emitSolved failed: ${e instanceof Error ? e.message : String(e)}`)));
168+
marker(active.pageTargetId, 'cf.solved', {
169+
type: 'turnstile', method: 'auto_solve', duration_ms: 0,
170+
phase_label: '→', signal,
171+
});
172+
}),
173+
174+
Match.exhaustive,
175+
);
176+
});
177+
178+
const consumer = Stream.fromQueue(queue).pipe(
179+
Stream.runForEach(handleEvent),
180+
Effect.catchCause((cause) =>
181+
Effect.logError('CF event queue consumer crashed').pipe(
182+
Effect.annotateLogs({ cause: String(cause) }),
183+
),
184+
),
185+
);
186+
187+
return { queue, consumer };
188+
}

src/session/cf/cf-event-types.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/**
2+
* CFEvent — tagged enum for all CF event types.
3+
*
4+
* Replaces the createCFEvents frozen closure with a discriminated union
5+
* that flows through a Queue.unbounded pipeline. Each variant carries
6+
* exactly the data needed by the consumer.
7+
*
8+
* Queue.offerUnsafe is synchronous — safe from Resolution callbacks,
9+
* scope finalizers, and any other sync context.
10+
*/
11+
import { Data } from 'effect';
12+
13+
import type { TargetId, CloudflareResult } from '../../shared/cloudflare-detection.js';
14+
import type { CdpSessionId } from '../../shared/cloudflare-detection.js';
15+
import type { ReadonlyActiveDetection } from './cloudflare-event-emitter.js';
16+
17+
export type CFEvent = Data.TaggedEnum<{
18+
/** CF challenge detected on a page. */
19+
Detected: {
20+
readonly active: ReadonlyActiveDetection;
21+
};
22+
/** Solver progress update — tracker mutation + CDP event + marker. */
23+
Progress: {
24+
readonly active: ReadonlyActiveDetection;
25+
readonly state: string;
26+
readonly extra?: Record<string, any>;
27+
};
28+
/** CF challenge solved — snapshot + log + CDP event + marker. */
29+
Solved: {
30+
readonly active: ReadonlyActiveDetection;
31+
readonly result: CloudflareResult;
32+
readonly cf_summary_label?: string;
33+
readonly skipMarker?: boolean;
34+
};
35+
/** CF challenge failed — snapshot + log + CDP event + marker. */
36+
Failed: {
37+
readonly active: ReadonlyActiveDetection;
38+
readonly reason: string;
39+
readonly duration: number;
40+
readonly phaseLabel?: string;
41+
readonly cf_summary_label?: string;
42+
readonly skipMarker?: boolean;
43+
readonly cf_verified?: boolean;
44+
};
45+
/** Inject a replay marker (no tracker mutation, no CDP event). */
46+
Marker: {
47+
readonly targetId: TargetId;
48+
readonly tag: string;
49+
readonly payload?: object;
50+
};
51+
/** Standalone auto-solved — construct synthetic detection + emit detected + solved. */
52+
StandaloneAutoSolved: {
53+
readonly targetId: TargetId;
54+
readonly signal: string;
55+
readonly tokenLength: number;
56+
readonly cdpSessionId?: CdpSessionId;
57+
};
58+
}>;
59+
export const CFEvent = Data.taggedEnum<CFEvent>();

src/session/cf/cf-session-state.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { Effect, Exit, Scope } from 'effect';
1010
import { runForkInServer } from '../../otel-runtime.js';
1111
import type { CdpSessionId, TargetId, CloudflareConfig, CloudflareType } from '../../shared/cloudflare-detection.js';
1212
import { isInterstitialType } from '../../shared/cloudflare-detection.js';
13-
import type { CFEvents } from './cloudflare-event-emitter.js';
13+
import { CFEvent } from './cf-event-types.js';
1414
import { DetectionRegistry } from './cf-detection-registry.js';
1515

1616
export class SessionSolverState {
@@ -37,7 +37,7 @@ export class SessionSolverState {
3737
/** Per-page accumulator of solved/failed phases for compound summary labels. */
3838
private readonly summaryPhases = new Map<TargetId, { type: string; label: string }[]>();
3939

40-
constructor(protected events: CFEvents) {
40+
constructor(protected cfPublish: (event: CFEvent) => void) {
4141
this.registry = new DetectionRegistry((active, signal) => {
4242
const duration = Date.now() - active.startTime;
4343

@@ -46,16 +46,20 @@ export class SessionSolverState {
4646
runForkInServer(Effect.logInfo(`Scope finalizer fallback: verified_session_close for ${active.pageTargetId}`));
4747
this.pushPhase(active.pageTargetId, active.info.type, phaseLabel);
4848
const compoundLabel = this.buildCompoundLabel(active.pageTargetId);
49-
this.events.emitFailed(active, 'verified_session_close', duration, phaseLabel, compoundLabel,
50-
{ cf_verified: true });
49+
this.cfPublish(CFEvent.Failed({
50+
active, reason: 'verified_session_close', duration, phaseLabel, cf_summary_label: compoundLabel,
51+
cf_verified: true,
52+
}));
5153
return;
5254
}
5355

5456
const failLabel = `✗ ${signal}`;
5557
runForkInServer(Effect.logInfo(`Scope finalizer fallback: emitting failed for orphaned detection on ${active.pageTargetId}`));
5658
this.pushPhase(active.pageTargetId, active.info.type, failLabel);
5759
const compoundLabel = this.buildCompoundLabel(active.pageTargetId);
58-
this.events.emitFailed(active, signal, duration, failLabel, compoundLabel);
60+
this.cfPublish(CFEvent.Failed({
61+
active, reason: signal, duration, phaseLabel: failLabel, cf_summary_label: compoundLabel,
62+
}));
5963
});
6064
}
6165

0 commit comments

Comments
 (0)