diff --git a/.github/scripts/check-language-parity.sh b/.github/scripts/check-language-parity.sh new file mode 100755 index 0000000..e2db50a --- /dev/null +++ b/.github/scripts/check-language-parity.sh @@ -0,0 +1,75 @@ +#!/usr/bin/env bash +# +# Language parity check. +# +# command-stream ships two implementations that must stay in lock-step: the +# JavaScript library under js/src/** and the Rust library under rust/src/**. +# This script fails when a pull request changes one language's source without +# touching the other's, so that behavioral changes are always made in both +# languages (see issue #155 review feedback). +# +# Escape hatch: add the `parity-exempt` label to the PR for changes that are +# legitimately single-language (the workflow skips this check when the label is +# present). +# +# Environment: +# BASE_REF - the base branch to diff against (default: main). In GitHub +# Actions this is github.base_ref. +# +# Usage (locally): +# BASE_REF=main bash .github/scripts/check-language-parity.sh +set -euo pipefail + +BASE_REF="${BASE_REF:-main}" + +# Make sure the base branch is available locally, then resolve a ref we can diff +# against. Prefer the remote-tracking ref; fall back to the bare branch name. +git fetch --no-tags origin "${BASE_REF}" >/dev/null 2>&1 || true +if git rev-parse --verify --quiet "origin/${BASE_REF}" >/dev/null; then + BASE="origin/${BASE_REF}" +elif git rev-parse --verify --quiet "${BASE_REF}" >/dev/null; then + BASE="${BASE_REF}" +else + echo "::warning::Could not resolve base ref '${BASE_REF}'; skipping parity check." + exit 0 +fi + +MERGE_BASE="$(git merge-base "${BASE}" HEAD 2>/dev/null || echo "${BASE}")" +CHANGED="$(git diff --name-only "${MERGE_BASE}" HEAD)" + +echo "Comparing against ${BASE} (merge-base ${MERGE_BASE})" +echo "Changed files:" +echo "${CHANGED}" | sed 's/^/ /' + +js_changed=false +rust_changed=false +while IFS= read -r f; do + [ -z "${f}" ] && continue + case "${f}" in + js/src/*) js_changed=true ;; + rust/src/*) rust_changed=true ;; + esac +done < 130). +- Awaiting a command while an external `AbortSignal` fires no longer hangs: the + abort listener is now registered on the await/then path too, so the command + resolves promptly with the configured signal's exit code. diff --git a/js/README.md b/js/README.md index ee09683..b2705b0 100644 --- a/js/README.md +++ b/js/README.md @@ -360,10 +360,83 @@ import { $ } from 'command-stream'; for await (const chunk of $`long-running-command`.stream()) { if (chunk.type === 'stdout') { console.log('Real-time output:', chunk.data.toString()); + } else if (chunk.type === 'exit') { + console.log('Process exited with code:', chunk.code); } } ``` +`stream()` yields `{ type: 'stdout' | 'stderr', data: Buffer }` chunks as output +arrives, followed by a final `{ type: 'exit', code }` chunk when the process +exits. Always guard on `chunk.type` before reading `chunk.data`, since the +`exit` chunk carries `code` instead of `data`. + +The iterator terminates as soon as the process exits, even if a grandchild keeps +the stdout/stderr pipe open (e.g. `sh -c 'background-task & echo done'`). Any +output still buffered is drained within a short grace period (the `exitPumpGrace` +option, default `100`ms) before the lingering reads are aborted, so the loop +never hangs waiting on a pipe the command itself is no longer using. + +#### Stopping the process from inside the loop + +You can stop a long-running command while iterating over it — either by calling +`kill()` on the command, or simply by `break`ing out of the loop (which kills the +process automatically as the iterator is cleaned up): + +```javascript +const cmd = $`some-endless-stream`; + +for await (const chunk of cmd.stream()) { + if (chunk.type === 'stdout') { + console.log(chunk.data.toString()); + if (seenEnoughOutput(chunk)) { + cmd.kill(); // stops the process; the loop then ends with an exit chunk + } + } else if (chunk.type === 'exit') { + console.log('stopped with code', chunk.code); // 143 for the SIGTERM from kill() + } +} + +// Or just break — the process is terminated as the loop unwinds: +for await (const chunk of $`some-endless-stream`.stream()) { + if (chunk.type === 'stdout' && done(chunk)) break; +} +``` + +##### Choosing the stop signal + +`kill()` defaults to `SIGTERM`, but you can stop with any signal. Pass it +explicitly, or configure a default via the `killSignal` option so that an +argument-less `kill()`, a `break`, or an `AbortSignal` all use it: + +```javascript +// Explicit per-call signal: +cmd.kill('SIGINT'); // exit code 130 + +// Configured default — used by kill(), break, and AbortSignal cancellation: +const cmd = $({ killSignal: 'SIGINT' })`some-endless-stream`; +for await (const chunk of cmd.stream()) { + if (chunk.type === 'stdout' && done(chunk)) + cmd.kill(); // sends SIGINT + else if (chunk.type === 'exit') console.log(chunk.code); // 130 +} + +// AbortSignal style also honors killSignal — awaiting resolves promptly when +// the signal fires (it does not hang) with the configured signal's exit code: +const ac = new AbortController(); +const running = $({ + signal: ac.signal, + killSignal: 'SIGINT', +})`some-endless-stream`; +setTimeout(() => ac.abort(), 1000); // stops with SIGINT +const result = await running; +console.log(result.code); // 130 +``` + +command-stream still escalates to `SIGKILL` after delivering the chosen signal +so a process that ignores it is guaranteed to terminate; the reported exit code +reflects the signal you configured. + ### EventEmitter Pattern (Event-driven) ```javascript @@ -912,6 +985,8 @@ The enhanced `$` function returns a `ProcessRunner` instance that extends `Event - `interactive: boolean` - Enable TTY forwarding for interactive commands (requires `stdin: 'inherit'` and TTY environment) - `cwd: string` - Working directory for command - `env: object` - Environment variables +- `exitPumpGrace: number` - Milliseconds to wait for buffered output to drain after the process exits before aborting stdio reads held open by a grandchild (default `100`; see [Async Iteration](#async-iteration-real-time-streaming)) +- `killSignal: string` - Signal used to stop the process when it is killed without an explicit signal — i.e. `kill()` with no argument, `break`ing out of a `stream()` loop, or an external `AbortSignal` firing (default `'SIGTERM'`). An explicit `kill(signal)` argument always overrides this. The reported exit code follows the conventional `128 + signal` mapping (e.g. `SIGTERM` → 143, `SIGINT` → 130, `SIGKILL` → 137) **Override defaults:** diff --git a/js/examples/stream-exit-chunks.mjs b/js/examples/stream-exit-chunks.mjs new file mode 100644 index 0000000..9b6faaa --- /dev/null +++ b/js/examples/stream-exit-chunks.mjs @@ -0,0 +1,49 @@ +#!/usr/bin/env node +// Demonstrates issue #155 fixes for the stream() async iterator: +// 1. stream() yields a final { type: 'exit', code } chunk on process exit. +// 2. stream() (and awaiting a command) no longer hangs when the process has +// exited but a grandchild keeps the stdout pipe open. +import { $ } from '../src/$.mjs'; + +// 1. Observe the exit code from within the async iterator. +console.log('--- exit chunk ---'); +for await (const chunk of $({ + mirror: false, +})`sh -c 'echo hello; exit 3'`.stream()) { + if (chunk.type === 'exit') { + console.log('exit chunk, code =', chunk.code); + } else { + console.log(chunk.type, '=>', chunk.data.toString().trim()); + } +} + +// 2. The backgrounded `sleep` inherits stdout and keeps it open, but the +// iterator still terminates promptly once `sh` itself exits. +console.log('--- no hang with a lingering grandchild ---'); +const start = Date.now(); +for await (const chunk of $({ + mirror: false, +})`sh -c 'sleep 30 & echo done'`.stream()) { + if (chunk.type === 'exit') { + console.log(`finished in ${Date.now() - start}ms with code ${chunk.code}`); + } else { + console.log(chunk.type, '=>', chunk.data.toString().trim()); + } +} + +// 3. Stop a long-running command from inside the loop by calling kill(). +console.log('--- stop from inside the loop ---'); +const endless = $({ + mirror: false, +})`sh -c 'i=0; while true; do i=$((i+1)); echo tick-$i; sleep 0.1; done'`; +let ticks = 0; +for await (const chunk of endless.stream()) { + if (chunk.type === 'stdout') { + console.log('stdout =>', chunk.data.toString().trim()); + if (++ticks >= 3) { + endless.kill(); // ends the loop with an exit chunk + } + } else if (chunk.type === 'exit') { + console.log('stopped with code', chunk.code); + } +} diff --git a/js/experiments/bun-abort-iter.mjs b/js/experiments/bun-abort-iter.mjs new file mode 100644 index 0000000..c691250 --- /dev/null +++ b/js/experiments/bun-abort-iter.mjs @@ -0,0 +1,48 @@ +const start = Date.now(); +const child = Bun.spawn(['sh', '-c', 'sleep 3 & echo done'], { + stdin: 'pipe', + stdout: 'pipe', + stderr: 'pipe', + detached: true, +}); +const iterator = child.stdout[Symbol.asyncIterator](); +let aborted = false; +let abortResolve; +const abortP = new Promise((r) => (abortResolve = r)); +setTimeout(() => { + aborted = true; + abortResolve({ aborted: true }); + console.log('aborting at', Date.now() - start); +}, 150); +(async () => { + try { + while (true) { + const res = await Promise.race([iterator.next(), abortP]); + if (res.aborted) { + console.log('abort detected at', Date.now() - start); + break; + } + if (res.done) { + console.log('done at', Date.now() - start); + break; + } + console.log( + 'chunk at', + Date.now() - start, + new TextDecoder().decode(res.value).trim() + ); + } + } finally { + if (iterator.return) { + await iterator + .return() + .catch((e) => console.log('return err', e.message)); + console.log('iterator.return done at', Date.now() - start); + } + } + console.log('LOOP EXITED at', Date.now() - start); +})(); +setTimeout(() => { + console.log('--- 4s, exit'); + process.exit(0); +}, 4000); diff --git a/js/experiments/bun-exit-timing.mjs b/js/experiments/bun-exit-timing.mjs new file mode 100644 index 0000000..03837a5 --- /dev/null +++ b/js/experiments/bun-exit-timing.mjs @@ -0,0 +1,30 @@ +const start = Date.now(); +const child = Bun.spawn(['sh', '-c', 'sleep 3 & echo done'], { + stdin: 'pipe', + stdout: 'pipe', + stderr: 'pipe', + detached: true, +}); +child.exited.then((code) => + console.log('child.exited resolved at', Date.now() - start, 'code', code) +); +// read stdout +const reader = child.stdout.getReader(); +(async () => { + while (true) { + const { done, value } = await reader.read(); + if (done) { + console.log('stdout EOF at', Date.now() - start); + break; + } + console.log( + 'stdout data at', + Date.now() - start, + new TextDecoder().decode(value).trim() + ); + } +})(); +setTimeout(() => { + console.log('--- 4s mark, exiting'); + process.exit(0); +}, 4000); diff --git a/js/experiments/bun-reader-cancel.mjs b/js/experiments/bun-reader-cancel.mjs new file mode 100644 index 0000000..54fc011 --- /dev/null +++ b/js/experiments/bun-reader-cancel.mjs @@ -0,0 +1,38 @@ +const start = Date.now(); +const child = Bun.spawn(['sh', '-c', 'sleep 3 & echo done'], { + stdin: 'pipe', + stdout: 'pipe', + stderr: 'pipe', + detached: true, +}); +const reader = child.stdout.getReader(); +let cancelled = false; +setTimeout(async () => { + cancelled = true; + console.log('cancel at', Date.now() - start); + await reader.cancel().catch((e) => console.log('cancel err', e.message)); + console.log('cancel resolved at', Date.now() - start); +}, 150); +(async () => { + try { + while (true) { + const { value, done } = await reader.read(); + if (done) { + console.log('done at', Date.now() - start); + break; + } + console.log( + 'chunk at', + Date.now() - start, + new TextDecoder().decode(value).trim() + ); + } + } catch (e) { + console.log('read threw at', Date.now() - start, e.message); + } + console.log('LOOP EXITED at', Date.now() - start); +})(); +setTimeout(() => { + console.log('--- 4s, exit'); + process.exit(0); +}, 4000); diff --git a/js/experiments/node-exit-timing.mjs b/js/experiments/node-exit-timing.mjs new file mode 100644 index 0000000..63a4112 --- /dev/null +++ b/js/experiments/node-exit-timing.mjs @@ -0,0 +1,20 @@ +import cp from 'child_process'; +const start = Date.now(); +const child = cp.spawn('sh', ['-c', 'sleep 3 & echo done'], { + stdio: ['pipe', 'pipe', 'pipe'], + detached: true, +}); +child.on('exit', (code, sig) => + console.log('exit event at', Date.now() - start, 'code', code, 'sig', sig) +); +child.on('close', (code, sig) => + console.log('close event at', Date.now() - start, 'code', code) +); +child.stdout.on('data', (d) => + console.log('stdout data at', Date.now() - start, d.toString().trim()) +); +child.stdout.on('end', () => console.log('stdout end at', Date.now() - start)); +setTimeout(() => { + console.log('--- 4s, exit'); + process.exit(0); +}, 4000); diff --git a/js/experiments/repro-155-code.mjs b/js/experiments/repro-155-code.mjs new file mode 100644 index 0000000..c5ee27a --- /dev/null +++ b/js/experiments/repro-155-code.mjs @@ -0,0 +1,10 @@ +import { $ } from '../src/$.mjs'; +const cmd = $({ mirror: false })`sh -c 'echo out; echo err 1>&2; exit 7'`; +for await (const chunk of cmd.stream()) { + console.log( + 'chunk:', + chunk.type, + 'code=' + (chunk.code ?? ''), + (chunk.data?.toString?.() ?? '').trim() + ); +} diff --git a/js/experiments/repro-155.mjs b/js/experiments/repro-155.mjs new file mode 100644 index 0000000..123ce8e --- /dev/null +++ b/js/experiments/repro-155.mjs @@ -0,0 +1,15 @@ +import { $ } from '../src/$.mjs'; + +const cmd = $`echo hello`; +const types = []; +for await (const chunk of cmd.stream()) { + types.push(chunk.type); + console.log( + 'chunk:', + chunk.type, + chunk.code ?? '', + (chunk.data?.toString?.() ?? '').trim() + ); +} +console.log('TYPES:', JSON.stringify(types)); +console.log('Has exit chunk?', types.includes('exit')); diff --git a/js/experiments/repro-abort-killsignal.mjs b/js/experiments/repro-abort-killsignal.mjs new file mode 100644 index 0000000..ac16f67 --- /dev/null +++ b/js/experiments/repro-abort-killsignal.mjs @@ -0,0 +1,26 @@ +import { $ } from '../src/$.mjs'; +const ac = new AbortController(); +const running = $({ + mirror: false, + signal: ac.signal, + killSignal: 'SIGINT', +})`sh -c 'i=0; while true; do echo a-$i; i=$((i+1)); sleep 0.05; done'`; +setTimeout(() => { + console.log('aborting'); + ac.abort(); +}, 200); +try { + const result = await running; + console.log('resolved exit code:', result.code); +} catch (e) { + console.log( + 'threw:', + e.constructor.name, + e.message, + 'code=', + e.code, + 'result.code=', + e.result?.code + ); +} +console.log('END'); diff --git a/js/experiments/repro-ctrlc.mjs b/js/experiments/repro-ctrlc.mjs new file mode 100644 index 0000000..5fd47b4 --- /dev/null +++ b/js/experiments/repro-ctrlc.mjs @@ -0,0 +1,7 @@ +import { $ } from '../src/$.mjs'; +const runner = $({ stdin: 'test input\n' })`sleep 10`; +const promise = runner.start(); +await new Promise((r) => setTimeout(r, 200)); +runner.kill(); +const result = await promise; +console.log('RESULT CODE:', result.code, 'cancelled:', runner._cancelled); diff --git a/js/experiments/repro-grace.mjs b/js/experiments/repro-grace.mjs new file mode 100644 index 0000000..86937f7 --- /dev/null +++ b/js/experiments/repro-grace.mjs @@ -0,0 +1,24 @@ +import { $ } from '../src/$.mjs'; + +// Compare grace settings in the grandchild-holds-pipe case. +for (const grace of [0, 50, 100, 200]) { + const start = Date.now(); + const cmd = $({ + mirror: false, + exitPumpGrace: grace, + })`sh -c 'sleep 5 & echo done'`; + const out = []; + for await (const chunk of cmd.stream()) { + if (chunk.type === 'stdout') { + out.push(chunk.data.toString().trim()); + } + if (chunk.type === 'exit') { + out.push('exit=' + chunk.code); + } + } + console.log( + `grace=${grace}ms -> elapsed ${Date.now() - start}ms, captured ${JSON.stringify(out)}` + ); + cmd.kill('SIGKILL'); +} +process.exit(0); diff --git a/js/experiments/repro-hang.mjs b/js/experiments/repro-hang.mjs new file mode 100644 index 0000000..581c84d --- /dev/null +++ b/js/experiments/repro-hang.mjs @@ -0,0 +1,28 @@ +import { $ } from '../src/$.mjs'; + +const start = Date.now(); +// sh exits immediately but backgrounds a sleep that inherits stdout, keeping pipe open +const cmd = $`sh -c 'sleep 3 & echo done'`; +const types = []; +const timer = setTimeout(() => { + console.log('STILL HANGING after 1.5s, elapsed', Date.now() - start); +}, 1500); +for await (const chunk of cmd.stream()) { + types.push(chunk.type); + console.log( + 'chunk:', + chunk.type, + chunk.code ?? '', + (chunk.data?.toString?.() ?? '').trim(), + 'at', + Date.now() - start, + 'ms' + ); +} +clearTimeout(timer); +console.log( + 'DONE. elapsed', + Date.now() - start, + 'ms, types:', + JSON.stringify(types) +); diff --git a/js/experiments/repro-killsignal.mjs b/js/experiments/repro-killsignal.mjs new file mode 100644 index 0000000..aa19754 --- /dev/null +++ b/js/experiments/repro-killsignal.mjs @@ -0,0 +1,55 @@ +import { $ } from '../src/$.mjs'; + +// Default SIGTERM -> 143 +{ + const cmd = $({ + mirror: false, + })`sh -c 'i=0; while true; do echo t-$i; i=$((i+1)); sleep 0.05; done'`; + let n = 0, + code; + for await (const ch of cmd.stream()) { + if (ch.type === 'stdout' && ++n >= 2) { + cmd.kill(); + } else if (ch.type === 'exit') { + code = ch.code; + } + } + console.log('default kill exit code:', code, '(expect 143)'); +} + +// Configured SIGINT -> 130 +{ + const cmd = $({ + mirror: false, + killSignal: 'SIGINT', + })`sh -c 'i=0; while true; do echo t-$i; i=$((i+1)); sleep 0.05; done'`; + let n = 0, + code; + for await (const ch of cmd.stream()) { + if (ch.type === 'stdout' && ++n >= 2) { + cmd.kill(); + } // no arg -> uses killSignal + else if (ch.type === 'exit') { + code = ch.code; + } + } + console.log('configured SIGINT exit code:', code, '(expect 130)'); +} + +// Explicit signal arg overrides -> SIGKILL 137 +{ + const cmd = $({ + mirror: false, + })`sh -c 'i=0; while true; do echo t-$i; i=$((i+1)); sleep 0.05; done'`; + let n = 0, + code; + for await (const ch of cmd.stream()) { + if (ch.type === 'stdout' && ++n >= 2) { + cmd.kill('SIGKILL'); + } else if (ch.type === 'exit') { + code = ch.code; + } + } + console.log('explicit SIGKILL exit code:', code, '(expect 137)'); +} +console.log('DONE'); diff --git a/js/experiments/repro-stop-in-loop.mjs b/js/experiments/repro-stop-in-loop.mjs new file mode 100644 index 0000000..6cc6267 --- /dev/null +++ b/js/experiments/repro-stop-in-loop.mjs @@ -0,0 +1,71 @@ +import { $ } from '../src/$.mjs'; + +// Feature: stop the process from inside the stream() loop. +// A long-running producer keeps emitting lines; we stop after 3. + +console.log('--- kill() inside the loop ---'); +{ + const start = Date.now(); + const cmd = $({ + mirror: false, + })`sh -c 'i=0; while true; do i=$((i+1)); echo line-$i; sleep 0.05; done'`; + let count = 0; + const seen = []; + for await (const chunk of cmd.stream()) { + if (chunk.type === 'stdout') { + count += chunk.data.toString().split('\n').filter(Boolean).length; + seen.push(chunk.data.toString().trim()); + if (count >= 3) { + cmd.kill(); // stop the process from inside the loop + } + } else if (chunk.type === 'exit') { + seen.push('EXIT code=' + chunk.code); + } + } + console.log( + 'elapsed', + Date.now() - start, + 'ms; chunks:', + JSON.stringify(seen) + ); +} + +console.log('--- break inside the loop ---'); +{ + const start = Date.now(); + const cmd = $({ + mirror: false, + })`sh -c 'i=0; while true; do i=$((i+1)); echo b-$i; sleep 0.05; done'`; + let count = 0; + for await (const chunk of cmd.stream()) { + if (chunk.type === 'stdout') { + count += 1; + if (count >= 3) { + break; + } + } + } + console.log( + 'elapsed', + Date.now() - start, + 'ms; finished?', + cmd.finished, + 'code', + cmd.result?.code + ); +} + +console.log('--- normal command timing (should NOT add ~100ms) ---'); +{ + const samples = []; + for (let i = 0; i < 3; i++) { + const t = Date.now(); + for await (const chunk of $({ mirror: false })`echo quick`.stream()) { + void chunk; + } + samples.push(Date.now() - t); + } + console.log('elapsed samples ms:', JSON.stringify(samples)); +} + +process.exit(0); diff --git a/js/experiments/verify-normal.mjs b/js/experiments/verify-normal.mjs new file mode 100644 index 0000000..34de948 --- /dev/null +++ b/js/experiments/verify-normal.mjs @@ -0,0 +1,28 @@ +import { $ } from '../src/$.mjs'; + +// 1. Large output - no truncation +const big = await $({ + mirror: false, +})`sh -c 'for i in $(seq 1 5000); do echo "line $i"; done'`; +const lines = big.stdout.trim().split('\n'); +console.log( + 'large output lines:', + lines.length, + 'last:', + lines[lines.length - 1], + 'code:', + big.code +); + +// 2. exit code non-zero +const r = await $({ mirror: false })`sh -c 'exit 42'`.catch((e) => e); +console.log('exit code 42:', r.code); + +// 3. stderr capture +const e = await $({ mirror: false })`sh -c 'echo to-err 1>&2'`; +console.log('stderr:', JSON.stringify(e.stderr.trim())); + +// 4. timing for normal fast command (should be fast, not +100ms) +const t = Date.now(); +await $({ mirror: false })`echo quick`; +console.log('fast command elapsed:', Date.now() - t, 'ms'); diff --git a/js/src/$.process-runner-base.mjs b/js/src/$.process-runner-base.mjs index b778e92..80a2e19 100644 --- a/js/src/$.process-runner-base.mjs +++ b/js/src/$.process-runner-base.mjs @@ -215,6 +215,7 @@ class ProcessRunner extends StreamEmitter { env: undefined, interactive: false, shellOperators: true, + killSignal: 'SIGTERM', ...options, }; diff --git a/js/src/$.process-runner-execution.mjs b/js/src/$.process-runner-execution.mjs index d16f387..9584668 100644 --- a/js/src/$.process-runner-execution.mjs +++ b/js/src/$.process-runner-execution.mjs @@ -8,6 +8,10 @@ import { StreamUtils, safeWrite, asBuffer } from './$.stream-utils.mjs'; import { pumpReadable } from './$.quote.mjs'; import { createResult } from './$.result.mjs'; import { parseShellCommand, needsRealShell } from './shell-parser.mjs'; +import { + createExitPromise, + drainPumpsAfterExit, +} from './$.process-runner-exit.mjs'; const isBun = typeof globalThis.Bun !== 'undefined'; @@ -245,33 +249,37 @@ function setupChildEventListeners(runner) { * @param {number} childPid - Child process PID * @returns {Promise} */ -function createStdoutPump(runner, childPid) { +function createStdoutPump(runner, childPid, signal) { if (!runner.child.stdout) { return Promise.resolve(); } - return pumpReadable(runner.child.stdout, (buf) => { - trace( - 'ProcessRunner', - () => - `stdout data received | ${JSON.stringify({ - pid: childPid, - bufferLength: buf.length, - capture: runner.options.capture, - mirror: runner.options.mirror, - preview: buf.toString().slice(0, 100), - })}` - ); + return pumpReadable( + runner.child.stdout, + (buf) => { + trace( + 'ProcessRunner', + () => + `stdout data received | ${JSON.stringify({ + pid: childPid, + bufferLength: buf.length, + capture: runner.options.capture, + mirror: runner.options.mirror, + preview: buf.toString().slice(0, 100), + })}` + ); - if (runner.options.capture) { - runner.outChunks.push(buf); - } - if (runner.options.mirror) { - safeWrite(process.stdout, buf); - } + if (runner.options.capture) { + runner.outChunks.push(buf); + } + if (runner.options.mirror) { + safeWrite(process.stdout, buf); + } - runner._emitProcessedData('stdout', buf); - }); + runner._emitProcessedData('stdout', buf); + }, + signal + ); } /** @@ -280,33 +288,37 @@ function createStdoutPump(runner, childPid) { * @param {number} childPid - Child process PID * @returns {Promise} */ -function createStderrPump(runner, childPid) { +function createStderrPump(runner, childPid, signal) { if (!runner.child.stderr) { return Promise.resolve(); } - return pumpReadable(runner.child.stderr, (buf) => { - trace( - 'ProcessRunner', - () => - `stderr data received | ${JSON.stringify({ - pid: childPid, - bufferLength: buf.length, - capture: runner.options.capture, - mirror: runner.options.mirror, - preview: buf.toString().slice(0, 100), - })}` - ); + return pumpReadable( + runner.child.stderr, + (buf) => { + trace( + 'ProcessRunner', + () => + `stderr data received | ${JSON.stringify({ + pid: childPid, + bufferLength: buf.length, + capture: runner.options.capture, + mirror: runner.options.mirror, + preview: buf.toString().slice(0, 100), + })}` + ); - if (runner.options.capture) { - runner.errChunks.push(buf); - } - if (runner.options.mirror) { - safeWrite(process.stderr, buf); - } + if (runner.options.capture) { + runner.errChunks.push(buf); + } + if (runner.options.mirror) { + safeWrite(process.stderr, buf); + } - runner._emitProcessedData('stderr', buf); - }); + runner._emitProcessedData('stderr', buf); + }, + signal + ); } /** @@ -415,55 +427,6 @@ function handleStdin(runner, stdin, isInteractive) { return Promise.resolve(); } -/** - * Create promise for child exit - * @param {object} child - Child process - * @returns {Promise} - */ -function createExitPromise(child) { - if (isBun) { - return child.exited; - } - - return new Promise((resolve) => { - trace( - 'ProcessRunner', - () => `Setting up child process event listeners for PID ${child.pid}` - ); - - child.on('close', (code, signal) => { - trace( - 'ProcessRunner', - () => - `Child process close event | ${JSON.stringify({ - pid: child.pid, - code, - signal, - killed: child.killed, - exitCode: child.exitCode, - signalCode: child.signalCode, - })}` - ); - resolve(code); - }); - - child.on('exit', (code, signal) => { - trace( - 'ProcessRunner', - () => - `Child process exit event | ${JSON.stringify({ - pid: child.pid, - code, - signal, - killed: child.killed, - exitCode: child.exitCode, - signalCode: child.signalCode, - })}` - ); - }); - }); -} - /** * Determine final exit code * @param {number|null|undefined} code - Raw exit code @@ -691,6 +654,14 @@ function setupExternalAbortSignal(runner) { return; } + // Guard against double registration: this is invoked both from start() (when + // options are merged) and from _doStartAsync() (so the await/then path also + // honors an AbortSignal). Without this guard the listener would fire twice. + if (runner._externalAbortSetup) { + return; + } + runner._externalAbortSetup = true; + trace( 'ProcessRunner', () => @@ -714,7 +685,9 @@ function setupExternalAbortSignal(runner) { })}` ); - runner.kill('SIGTERM'); + // Honor the configured killSignal (defaults to SIGTERM) so AbortSignal-based + // cancellation uses the same stop signal as an explicit kill(). + runner.kill(); trace( 'ProcessRunner', () => 'Process kill initiated due to external abort signal' @@ -735,7 +708,7 @@ function setupExternalAbortSignal(runner) { () => `External signal already aborted, killing process and aborting internal controller` ); - runner.kill('SIGTERM'); + runner.kill(); if (runner._abortController && !runner._abortController.signal.aborted) { runner._abortController.abort(); } @@ -976,13 +949,18 @@ async function executeChildProcess(runner, argv, config) { } const childPid = runner.child?.pid; - const outPump = createStdoutPump(runner, childPid); - const errPump = createStderrPump(runner, childPid); + const pumpAbort = new AbortController(); + const outPump = createStdoutPump(runner, childPid, pumpAbort.signal); + const errPump = createStderrPump(runner, childPid, pumpAbort.signal); const stdinPumpPromise = handleStdin(runner, stdin, isInteractive); - const exited = createExitPromise(runner.child); + const exited = createExitPromise(runner.child, runner); const code = await exited; - await Promise.all([outPump, errPump, stdinPumpPromise]); + await drainPumpsAfterExit( + runner, + [outPump, errPump, stdinPumpPromise], + pumpAbort + ); const finalExitCode = determineFinalExitCode(code, runner._cancelled); const resultData = buildResultData(runner, finalExitCode); @@ -1115,6 +1093,12 @@ export function attachExecutionMethods(ProcessRunner, deps) { this.started = true; this._mode = 'async'; + // Ensure an external AbortSignal (options.signal) is honored regardless of + // how the runner was started. The await/then path reaches here without + // going through start()'s option-merge branch, so register the listener + // here too (idempotent via the _externalAbortSetup guard). + setupExternalAbortSignal(this); + try { const { cwd, env, stdin } = this.options; diff --git a/js/src/$.process-runner-exit.mjs b/js/src/$.process-runner-exit.mjs new file mode 100644 index 0000000..0c4b10e --- /dev/null +++ b/js/src/$.process-runner-exit.mjs @@ -0,0 +1,177 @@ +// ProcessRunner exit handling - detecting process exit and draining stdio pumps +// Part of the modular ProcessRunner architecture. +// +// These helpers exist to solve issue #155: after a child process exits, its +// stdout/stderr pipes normally reach EOF almost immediately. But if the child +// spawned grandchildren that inherited those pipes (e.g. +// `sh -c 'long-task & echo done'`), the pipes can stay open long after the +// command itself has exited, leaving the output pumps — and therefore finish() +// and any stream() iterator — hanging indefinitely. + +import { trace } from './$.trace.mjs'; + +const isBun = typeof globalThis.Bun !== 'undefined'; + +// Once the process has exited, we wait this long for the pumps to drain +// naturally and then force the lingering readables closed. +export const EXIT_PUMP_GRACE_MS = 100; + +/** + * Wait for the output/stdin pumps to settle after the process has exited, + * without hanging forever if grandchildren keep the stdio pipes open. If the + * pumps do not drain naturally within the grace period, the abort controller + * is triggered so the pumps stop reading and resolve. + * @param {object} runner - ProcessRunner instance + * @param {Promise[]} pumps - Pending pump promises + * @param {AbortController} pumpAbort - Controller that aborts the output pumps + * @returns {Promise} + */ +export async function drainPumpsAfterExit(runner, pumps, pumpAbort) { + const allSettled = Promise.allSettled(pumps); + + const graceMs = + typeof runner.options.exitPumpGrace === 'number' + ? runner.options.exitPumpGrace + : EXIT_PUMP_GRACE_MS; + + if (graceMs > 0) { + let timer; + const grace = new Promise((resolve) => { + timer = setTimeout(resolve, graceMs); + timer.unref?.(); + }); + + const winner = await Promise.race([ + allSettled.then(() => 'pumps'), + grace.then(() => 'grace'), + ]); + clearTimeout(timer); + + if (winner === 'pumps') { + return; + } + + trace( + 'ProcessRunner', + () => + `Pumps still pending ${graceMs}ms after exit; aborting stdio reads (pipes likely held open by a grandchild)` + ); + } + + pumpAbort?.abort(); + await allSettled; +} + +/** + * Create promise for child exit. + * + * Resolves with the child's exit code as soon as the process exits — or as + * soon as the runner is cancelled — without waiting for the stdio pipes to + * close (a grandchild may keep them open indefinitely, issue #155). + * + * @param {object} child - Child process + * @param {object} [runner] - ProcessRunner instance (for cancellation) + * @returns {Promise} + */ +export function createExitPromise(child, runner) { + // Bun's spawn exposes an `exited` promise. Note that even under Bun we may + // have spawned via Node's child_process (e.g. when an explicit stdin pipe is + // needed), in which case `child.exited` is undefined and we must fall back to + // the event-based path below — otherwise `await undefined` resolves + // immediately with the wrong (undefined) exit code. + // + // Even on the Bun path we still race against cancellation: killRunner aborts + // the runner's internal controller, and we must resolve promptly so the + // awaiting caller doesn't hang (see the Node note below). + const signal = runner?._abortController?.signal; + + if (isBun && child.exited && typeof child.exited.then === 'function') { + if (!signal) { + return child.exited; + } + return new Promise((resolve) => { + let resolved = false; + const settle = (code) => { + if (resolved) { + return; + } + resolved = true; + resolve(code); + }; + child.exited.then(settle, () => settle(null)); + if (signal.aborted) { + settle(null); + } else { + signal.addEventListener('abort', () => settle(null), { once: true }); + } + }); + } + + return new Promise((resolve) => { + trace( + 'ProcessRunner', + () => `Setting up child process event listeners for PID ${child.pid}` + ); + + let resolved = false; + const settle = (code) => { + if (resolved) { + return; + } + resolved = true; + resolve(code); + }; + + // killRunner() calls killChildProcess(), which removes all of the child's + // listeners (including the 'exit'/'close' handlers below) before the exit + // event has a chance to fire. Without this, `await exited` would hang + // forever on a kill/cancel. Resolve as soon as the runner is cancelled so + // the caller can finalize with the signal-derived exit code. + if (signal) { + if (signal.aborted) { + settle(null); + } else { + signal.addEventListener('abort', () => settle(null), { once: true }); + } + } + + // Resolve as soon as the process itself exits. We deliberately do NOT wait + // for the 'close' event (which only fires once every stdio stream is also + // closed) because grandchildren that inherited the pipes can keep them + // open long after the command has exited, which would otherwise hang the + // caller indefinitely (issue #155). drainPumpsAfterExit() drains any + // remaining buffered output before the result is finalized. + child.on('exit', (code, exitSignal) => { + trace( + 'ProcessRunner', + () => + `Child process exit event | ${JSON.stringify({ + pid: child.pid, + code, + signal: exitSignal, + killed: child.killed, + exitCode: child.exitCode, + signalCode: child.signalCode, + })}` + ); + settle(code); + }); + + // 'close' is still handled as a fallback in case 'exit' never fires. + child.on('close', (code, closeSignal) => { + trace( + 'ProcessRunner', + () => + `Child process close event | ${JSON.stringify({ + pid: child.pid, + code, + signal: closeSignal, + killed: child.killed, + exitCode: child.exitCode, + signalCode: child.signalCode, + })}` + ); + settle(code); + }); + }); +} diff --git a/js/src/$.process-runner-stream-kill.mjs b/js/src/$.process-runner-stream-kill.mjs index 9a13140..9f6198e 100644 --- a/js/src/$.process-runner-stream-kill.mjs +++ b/js/src/$.process-runner-stream-kill.mjs @@ -1,6 +1,7 @@ // ProcessRunner stream and kill methods - streaming and process termination // Part of the modular ProcessRunner architecture +import os from 'os'; import { trace } from './$.trace.mjs'; import { createResult } from './$.result.mjs'; @@ -48,8 +49,9 @@ function sendSignalToProcess(pid, sig, runtime) { /** * Kill a child process with escalating signals * @param {object} child - Child process object + * @param {string} [signal] - Signal to send first (default 'SIGTERM') */ -function killChildProcess(child) { +function killChildProcess(child, signal = 'SIGTERM') { if (!child || !child.pid) { return; } @@ -58,12 +60,18 @@ function killChildProcess(child) { trace( 'ProcessRunner', () => - `Killing ${runtime} process | ${JSON.stringify({ pid: child.pid }, null, 2)}` + `Killing ${runtime} process | ${JSON.stringify({ pid: child.pid, signal }, null, 2)}` ); + // Send the configured signal first, then escalate to SIGKILL to guarantee + // termination even if the process ignores or handles the first signal. + // When the configured signal already is SIGKILL we skip the redundant second + // delivery. const killOperations = []; - killOperations.push(...sendSignalToProcess(child.pid, 'SIGTERM', runtime)); - killOperations.push(...sendSignalToProcess(child.pid, 'SIGKILL', runtime)); + killOperations.push(...sendSignalToProcess(child.pid, signal, runtime)); + if (signal !== 'SIGKILL') { + killOperations.push(...sendSignalToProcess(child.pid, 'SIGKILL', runtime)); + } trace( 'ProcessRunner', @@ -176,10 +184,19 @@ function cleanupVirtualGenerator(generator, signal) { /** * Get exit code for signal + * + * Uses the conventional 128 + signal-number mapping (so SIGTERM → 143, + * SIGKILL → 137, SIGINT → 130, SIGHUP → 129, …) resolved from the runtime's + * signal table so any configured signal reports a faithful exit code. * @param {string} signal - Signal name * @returns {number} Exit code */ function getSignalExitCode(signal) { + const signals = os.constants?.signals || {}; + if (typeof signals[signal] === 'number') { + return 128 + signals[signal]; + } + // Fallbacks for runtimes that do not expose the signal table. if (signal === 'SIGKILL') { return 137; } @@ -210,7 +227,7 @@ function killRunner(runner, signal) { if (runner.child && !runner.finished) { trace('ProcessRunner', () => `Killing child process ${runner.child.pid}`); try { - killChildProcess(runner.child); + killChildProcess(runner.child, signal); runner.child = null; } catch (err) { trace('ProcessRunner', () => `Error killing process: ${err.message}`); @@ -259,6 +276,20 @@ export function attachStreamKillMethods(ProcessRunner) { } }; + // Yield an explicit { type: 'exit', code } chunk when the process exits so + // consumers can observe the exit code from within the async iterator (see + // issue #155). 'exit' is emitted by finish() right after 'end', so the + // chunk is queued before the iterator drains and terminates. + const onExit = (code) => { + if (!killed) { + buffer.push({ type: 'exit', code }); + if (resolve) { + resolve(); + resolve = _reject = null; + } + } + }; + const onEnd = () => { ended = true; if (resolve) { @@ -268,6 +299,7 @@ export function attachStreamKillMethods(ProcessRunner) { }; this.on('data', onData); + this.on('exit', onExit); this.on('end', onEnd); try { @@ -289,6 +321,7 @@ export function attachStreamKillMethods(ProcessRunner) { } } finally { this.off('data', onData); + this.off('exit', onExit); this.off('end', onEnd); if (!this.finished) { killed = true; @@ -299,14 +332,17 @@ export function attachStreamKillMethods(ProcessRunner) { } }; - ProcessRunner.prototype.kill = function (signal = 'SIGTERM') { + ProcessRunner.prototype.kill = function (signal) { + // When no explicit signal is passed (e.g. the iterator auto-killing on + // break), fall back to the configured `killSignal` option, then SIGTERM. + const effectiveSignal = signal || this.options?.killSignal || 'SIGTERM'; trace( 'ProcessRunner', - () => `kill | signal=${signal} finished=${this.finished}` + () => `kill | signal=${effectiveSignal} finished=${this.finished}` ); if (this.finished) { return; } - killRunner(this, signal); + killRunner(this, effectiveSignal); }; } diff --git a/js/src/$.quote.mjs b/js/src/$.quote.mjs index c92c10e..d9d8c41 100644 --- a/js/src/$.quote.mjs +++ b/js/src/$.quote.mjs @@ -100,39 +100,7 @@ export function buildShellCommand(strings, values) { for (let i = 0; i < strings.length; i++) { out += strings[i]; if (i < values.length) { - const v = values[i]; - if ( - v && - typeof v === 'object' && - Object.prototype.hasOwnProperty.call(v, 'raw') - ) { - trace( - 'Utils', - () => - `BRANCH: buildShellCommand => RAW_VALUE | ${JSON.stringify({ value: String(v.raw) }, null, 2)}` - ); - out += String(v.raw); - } else if ( - v && - typeof v === 'object' && - Object.prototype.hasOwnProperty.call(v, 'literal') - ) { - const literalQuoted = quoteLiteral(v.literal); - trace( - 'Utils', - () => - `BRANCH: buildShellCommand => LITERAL_VALUE | ${JSON.stringify({ original: v.literal, quoted: literalQuoted }, null, 2)}` - ); - out += literalQuoted; - } else { - const quoted = quote(v); - trace( - 'Utils', - () => - `BRANCH: buildShellCommand => QUOTED_VALUE | ${JSON.stringify({ original: v, quoted }, null, 2)}` - ); - out += quoted; - } + out += formatInterpolatedValue(values[i]); } } @@ -144,6 +112,45 @@ export function buildShellCommand(strings, values) { return out; } +/** + * Format a single interpolated value for a shell command: raw values are + * inserted verbatim, { literal } values are double-quoted, everything else is + * shell-quoted. + * @param {*} v - Interpolated value + * @returns {string} Formatted fragment + */ +function formatInterpolatedValue(v) { + const isWrapper = (key) => + v && typeof v === 'object' && Object.prototype.hasOwnProperty.call(v, key); + + if (isWrapper('raw')) { + trace( + 'Utils', + () => + `BRANCH: buildShellCommand => RAW_VALUE | ${JSON.stringify({ value: String(v.raw) }, null, 2)}` + ); + return String(v.raw); + } + + if (isWrapper('literal')) { + const literalQuoted = quoteLiteral(v.literal); + trace( + 'Utils', + () => + `BRANCH: buildShellCommand => LITERAL_VALUE | ${JSON.stringify({ original: v.literal, quoted: literalQuoted }, null, 2)}` + ); + return literalQuoted; + } + + const quoted = quote(v); + trace( + 'Utils', + () => + `BRANCH: buildShellCommand => QUOTED_VALUE | ${JSON.stringify({ original: v, quoted }, null, 2)}` + ); + return quoted; +} + /** * Mark a value as raw (not to be quoted) * @param {*} value - Value to mark as raw @@ -167,7 +174,7 @@ export function raw(value) { * @returns {string} - The double-quoted string with proper escaping */ export function quoteLiteral(value) { - if (value == null) { + if (value === null || value === undefined) { return '""'; } if (Array.isArray(value)) { @@ -200,19 +207,80 @@ export function quoteLiteral(value) { } /** - * Pump a readable stream, calling onChunk for each chunk - * @param {Readable} readable - Readable stream + * Pump a readable stream, calling onChunk for each chunk. + * + * An optional AbortSignal can be supplied to stop pumping even while a read is + * pending. This is required to recover from the case where a process has + * exited but a grandchild keeps the stdio pipe open, which would otherwise + * leave the pump (and the awaiting caller) hanging forever (issue #155). + * + * @param {Readable|ReadableStream} readable - Readable stream * @param {function} onChunk - Callback for each chunk + * @param {AbortSignal} [signal] - Optional signal to abort pumping */ -export async function pumpReadable(readable, onChunk) { +export async function pumpReadable(readable, onChunk, signal) { if (!readable) { trace('Utils', () => 'pumpReadable: No readable stream provided'); return; } trace('Utils', () => 'pumpReadable: Starting to pump readable stream'); - for await (const chunk of readable) { - const { asBuffer } = await import('./$.stream-utils.mjs'); - await onChunk(asBuffer(chunk)); + const { asBuffer } = await import('./$.stream-utils.mjs'); + + // Web/Bun ReadableStream: use an explicit reader so a pending read() can be + // cancelled when the signal aborts. + if (typeof readable.getReader === 'function') { + const reader = readable.getReader(); + const onAbort = () => { + Promise.resolve(reader.cancel()).catch(() => {}); + }; + if (signal) { + if (signal.aborted) { + onAbort(); + } else { + signal.addEventListener('abort', onAbort, { once: true }); + } + } + try { + while (true) { + const { value, done } = await reader.read(); + if (done) { + break; + } + await onChunk(asBuffer(value)); + } + } finally { + if (signal) { + signal.removeEventListener('abort', onAbort); + } + reader.releaseLock?.(); + } + trace('Utils', () => 'pumpReadable: Finished pumping readable stream'); + return; + } + + // Node Readable: destroy() ends the async iteration when aborted. + const onAbort = () => { + try { + readable.destroy(); + } catch { + /* ignore */ + } + }; + if (signal) { + if (signal.aborted) { + onAbort(); + } else { + signal.addEventListener('abort', onAbort, { once: true }); + } + } + try { + for await (const chunk of readable) { + await onChunk(asBuffer(chunk)); + } + } finally { + if (signal) { + signal.removeEventListener('abort', onAbort); + } } trace('Utils', () => 'pumpReadable: Finished pumping readable stream'); } diff --git a/js/tests/stream-exit-chunks.test.mjs b/js/tests/stream-exit-chunks.test.mjs new file mode 100644 index 0000000..36f01dc --- /dev/null +++ b/js/tests/stream-exit-chunks.test.mjs @@ -0,0 +1,262 @@ +import { test, expect } from 'bun:test'; +import './test-helper.mjs'; // Automatically sets up beforeEach/afterEach cleanup +import { $ } from '../src/$.mjs'; + +// Regression tests for issue #155: +// 1. stream() must yield a { type: 'exit', code } chunk when the process exits +// 2. stream() must not hang forever when the process has exited but a +// grandchild keeps the stdio pipes open + +const isWindows = process.platform === 'win32'; + +test('stream() yields an exit chunk with the exit code (success)', async () => { + const cmd = $({ mirror: false })`echo hello`; + const chunks = []; + for await (const chunk of cmd.stream()) { + chunks.push(chunk); + } + + const types = chunks.map((c) => c.type); + expect(types).toContain('stdout'); + expect(types).toContain('exit'); + + // The exit chunk must be the last chunk and carry the exit code. + const exitChunk = chunks[chunks.length - 1]; + expect(exitChunk.type).toBe('exit'); + expect(exitChunk.code).toBe(0); +}); + +test.skipIf(isWindows)( + 'stream() exit chunk reports a non-zero exit code', + async () => { + const cmd = $({ mirror: false })`sh -c 'echo out; exit 7'`; + const chunks = []; + for await (const chunk of cmd.stream()) { + chunks.push(chunk); + } + + const exitChunk = chunks.find((c) => c.type === 'exit'); + expect(exitChunk).toBeDefined(); + expect(exitChunk.code).toBe(7); + } +); + +test.skipIf(isWindows)( + 'stream() does not hang when a grandchild keeps stdout open', + async () => { + // `sh` exits immediately after `echo done`, but the backgrounded `sleep` + // inherits the stdout pipe and keeps it open. Before the fix this caused + // stream() to hang until the sleep finished. + const start = Date.now(); + const cmd = $({ mirror: false })`sh -c 'sleep 5 & echo done'`; + + const types = []; + for await (const chunk of cmd.stream()) { + types.push(chunk.type); + } + const elapsed = Date.now() - start; + + expect(types).toContain('stdout'); + expect(types).toContain('exit'); + // Must terminate quickly (grace period ~100ms) rather than waiting for the + // 30s sleep. Allow generous headroom for slow CI. + expect(elapsed).toBeLessThan(10000); + + // Clean up the lingering background sleep. + cmd.kill('SIGKILL'); + }, + 20000 +); + +test.skipIf(isWindows)( + 'await on a command does not hang when a grandchild keeps stdout open', + async () => { + const start = Date.now(); + const result = await $({ mirror: false })`sh -c 'sleep 5 & echo done'`; + const elapsed = Date.now() - start; + + expect(result.code).toBe(0); + expect(result.stdout).toContain('done'); + expect(elapsed).toBeLessThan(10000); + }, + 20000 +); + +test.skipIf(isWindows)( + 'stream() can be stopped from inside the loop with kill()', + async () => { + // Endless producer: without stopping it, the loop would never end. + const cmd = $({ + mirror: false, + })`sh -c 'i=0; while true; do i=$((i+1)); echo line-$i; sleep 0.05; done'`; + + const start = Date.now(); + const stdoutCount = []; + let exitChunk; + for await (const chunk of cmd.stream()) { + if (chunk.type === 'stdout') { + stdoutCount.push(chunk.data.toString()); + if (stdoutCount.length >= 3) { + cmd.kill(); // stop the process from inside the loop + } + } else if (chunk.type === 'exit') { + exitChunk = chunk; + } + } + const elapsed = Date.now() - start; + + // The loop ends promptly after kill() rather than running forever. + expect(stdoutCount.length).toBeGreaterThanOrEqual(3); + expect(elapsed).toBeLessThan(10000); + // A kill() still yields a terminating exit chunk (SIGTERM => 143). + expect(exitChunk).toBeDefined(); + expect(exitChunk.code).toBe(143); + expect(cmd.finished).toBe(true); + }, + 20000 +); + +test.skipIf(isWindows)( + 'breaking out of the stream() loop stops the process', + async () => { + const cmd = $({ + mirror: false, + })`sh -c 'i=0; while true; do i=$((i+1)); echo b-$i; sleep 0.05; done'`; + + const start = Date.now(); + let count = 0; + for await (const chunk of cmd.stream()) { + if (chunk.type === 'stdout') { + count += 1; + if (count >= 3) { + break; // abandoning the iterator must terminate the process + } + } + } + const elapsed = Date.now() - start; + + expect(count).toBe(3); + expect(elapsed).toBeLessThan(10000); + // The finally block in stream() kills the still-running process on break. + expect(cmd.finished).toBe(true); + }, + 20000 +); + +test.skipIf(isWindows)( + 'kill() honors the configured killSignal option (SIGINT => 130)', + async () => { + const cmd = $({ + mirror: false, + killSignal: 'SIGINT', + })`sh -c 'i=0; while true; do i=$((i+1)); echo s-$i; sleep 0.05; done'`; + + let count = 0; + let exitChunk; + for await (const chunk of cmd.stream()) { + if (chunk.type === 'stdout') { + if (++count >= 3) { + cmd.kill(); // no explicit signal -> uses the configured killSignal + } + } else if (chunk.type === 'exit') { + exitChunk = chunk; + } + } + + expect(exitChunk).toBeDefined(); + // 128 + SIGINT(2) = 130 + expect(exitChunk.code).toBe(130); + expect(cmd.finished).toBe(true); + }, + 20000 +); + +test.skipIf(isWindows)( + 'an explicit kill(signal) overrides the configured killSignal', + async () => { + const cmd = $({ + mirror: false, + killSignal: 'SIGINT', + })`sh -c 'i=0; while true; do i=$((i+1)); echo k-$i; sleep 0.05; done'`; + + let count = 0; + let exitChunk; + for await (const chunk of cmd.stream()) { + if (chunk.type === 'stdout') { + if (++count >= 3) { + cmd.kill('SIGKILL'); // explicit argument wins over the option + } + } else if (chunk.type === 'exit') { + exitChunk = chunk; + } + } + + expect(exitChunk).toBeDefined(); + // 128 + SIGKILL(9) = 137 + expect(exitChunk.code).toBe(137); + }, + 20000 +); + +test.skipIf(isWindows)( + 'breaking out of the loop uses the configured killSignal', + async () => { + const cmd = $({ + mirror: false, + killSignal: 'SIGINT', + })`sh -c 'i=0; while true; do i=$((i+1)); echo br-$i; sleep 0.05; done'`; + + let count = 0; + for await (const chunk of cmd.stream()) { + if (chunk.type === 'stdout' && ++count >= 3) { + break; // iterator cleanup kills with the configured signal + } + } + + expect(cmd.finished).toBe(true); + // The result the iterator finalizes with reflects the configured signal. + const result = await cmd; + expect(result.code).toBe(130); + }, + 20000 +); + +test.skipIf(isWindows)( + 'an external AbortSignal stops an awaited command and honors killSignal', + async () => { + // Regression: awaiting a long-running command while an external + // AbortSignal fires used to hang forever because the abort listener was + // only registered on the start({...}) path, not the await/then path. + const ac = new AbortController(); + const running = $({ + mirror: false, + signal: ac.signal, + killSignal: 'SIGINT', + })`sh -c 'i=0; while true; do echo a-$i; i=$((i+1)); sleep 0.05; done'`; + + const start = Date.now(); + setTimeout(() => ac.abort(), 200); + const result = await running; + const elapsed = Date.now() - start; + + // Resolves promptly rather than hanging, with the configured signal's code. + expect(elapsed).toBeLessThan(10000); + // 128 + SIGINT(2) = 130 + expect(result.code).toBe(130); + }, + 20000 +); + +test('exit chunk is yielded with zero added latency for normal commands', async () => { + // The exit-pump grace only applies when a grandchild holds the pipe open; + // an ordinary command must terminate the iterator immediately. + const start = Date.now(); + const chunks = []; + for await (const chunk of $({ mirror: false })`echo quick`.stream()) { + chunks.push(chunk.type); + } + const elapsed = Date.now() - start; + + expect(chunks).toContain('exit'); + expect(elapsed).toBeLessThan(1000); +}); diff --git a/js/tests/virtual.test.mjs b/js/tests/virtual.test.mjs index 7efd08b..07d9345 100644 --- a/js/tests/virtual.test.mjs +++ b/js/tests/virtual.test.mjs @@ -210,12 +210,20 @@ describe('Virtual Commands System', () => { }); const chunks = []; + let exitCode; const cmd = $`count 3`; for await (const chunk of cmd.stream()) { + // stream() now also yields a { type: 'exit', code } chunk (issue #155), + // so consumers must guard on chunk.type before touching chunk.data. + if (chunk.type === 'exit') { + exitCode = chunk.code; + continue; + } chunks.push(chunk.data.toString()); } expect(chunks.length).toBeGreaterThan(0); + expect(exitCode).toBe(0); const output = chunks.join(''); // Check if we got the numbers in order, regardless of newlines diff --git a/rust/README.md b/rust/README.md index 74774bb..1c69aee 100644 --- a/rust/README.md +++ b/rust/README.md @@ -32,6 +32,50 @@ async fn main() { } ``` +## Streaming + +`StreamingRunner` streams output as it arrives and mirrors the JavaScript +`stream()` async iterator (issue #155): + +```rust +use command_stream::{OutputChunk, StreamingRunner}; + +#[tokio::main] +async fn main() { + // `kill_signal` configures the stop signal (default SIGTERM), just like the + // JavaScript `killSignal` option. + let runner = StreamingRunner::new("sh -c 'while true; do echo tick; sleep 0.1; done'") + .kill_signal("SIGINT"); + let mut stream = runner.stream(); + + let mut count = 0; + while let Some(chunk) = stream.next().await { + match chunk { + OutputChunk::Stdout(data) => { + print!("{}", String::from_utf8_lossy(&data)); + count += 1; + if count >= 3 { + stream.kill(); // stop from inside the loop (uses SIGINT) + } + } + OutputChunk::Stderr(_) => {} + // A terminating exit chunk is always delivered (128 + signal => 130). + OutputChunk::Exit(code) => println!("exit: {code}"), + } + } +} +``` + +Parity guarantees with the JavaScript implementation: + +- The stream yields a final `OutputChunk::Exit(code)` when the process exits. +- It never hangs when the process has exited but a grandchild keeps the stdio + pipes open — readers are drained for `exit_pump_grace_ms` (default 100ms) and + then aborted. +- The process can be stopped from inside the loop with `stream.kill()` (configured + signal) or `stream.kill_with(signal)` (explicit override); dropping the stream + (e.g. `break`) stops the process too. + ## Command Line The crate also builds a `command-stream` binary: diff --git a/rust/changelog.d/20260609_232146_stream_parity.md b/rust/changelog.d/20260609_232146_stream_parity.md new file mode 100644 index 0000000..35b1639 --- /dev/null +++ b/rust/changelog.d/20260609_232146_stream_parity.md @@ -0,0 +1,18 @@ +--- +bump: minor +--- + +### Added +- `StreamingRunner::kill_signal` to configure the signal used to stop a process + (default `SIGTERM`), mirroring the JavaScript `killSignal` option. +- `StreamingRunner::exit_pump_grace_ms` to configure the post-exit pipe drain + grace period (default 100ms). +- `OutputStream::kill` / `OutputStream::kill_with` to stop a streaming process + from inside the consumption loop; abandoning the stream (drop/`break`) now + stops the process too. + +### Fixed +- `OutputStream` no longer hangs when the process has exited but a grandchild + keeps the stdio pipes open: readers are drained with a grace period and then + aborted, and the exit chunk is always delivered (parity with the JavaScript + fix for issue #155). diff --git a/rust/src/stream.rs b/rust/src/stream.rs index 569af54..7f2d37b 100644 --- a/rust/src/stream.rs +++ b/rust/src/stream.rs @@ -3,6 +3,21 @@ //! This module provides async streaming capabilities similar to JavaScript's //! async iterators and stream handling in `$.stream-utils.mjs`. //! +//! It mirrors the JavaScript implementation's behavior for issue #155: +//! +//! 1. The stream yields an explicit `OutputChunk::Exit(code)` when the +//! process exits, so consumers can observe the exit code from inside the +//! loop. +//! 2. The stream does not hang forever when the process has exited but a +//! grandchild keeps the stdio pipes open (the readers are drained with a +//! grace period and then aborted). +//! 3. The process can be stopped from inside the loop via +//! [`OutputStream::kill`] / [`OutputStream::kill_with`], and abandoning the +//! stream (e.g. `break`) also stops the process. +//! 4. The stop signal is configurable via +//! [`StreamingRunner::kill_signal`] (default `SIGTERM`), just like the +//! JavaScript `killSignal` option. +//! //! ## Usage //! //! ```rust,no_run @@ -21,7 +36,8 @@ //! print!("{}", String::from_utf8_lossy(&data)); //! count += 1; //! if count >= 5 { -//! break; +//! // Stop the process from inside the loop. +//! stream.kill(); //! } //! } //! OutputChunk::Stderr(data) => { @@ -41,6 +57,7 @@ use std::collections::HashMap; use std::path::PathBuf; use std::process::Stdio; +use std::time::Duration; use tokio::io::BufReader; use tokio::process::Command; use tokio::sync::mpsc; @@ -48,6 +65,14 @@ use tokio::sync::mpsc; use crate::trace::trace_lazy; use crate::{CommandResult, Result}; +/// Default grace period (in milliseconds) to keep draining the stdio pipes +/// after the process has exited before aborting any lingering readers. Mirrors +/// the JavaScript `exitPumpGrace` default. +const DEFAULT_EXIT_PUMP_GRACE_MS: u64 = 100; + +/// Default signal used to stop a process when no explicit signal is given. +const DEFAULT_KILL_SIGNAL: &str = "SIGTERM"; + /// A chunk of output from a streaming process #[derive(Debug, Clone)] pub enum OutputChunk { @@ -65,6 +90,8 @@ pub struct StreamingRunner { cwd: Option, env: Option>, stdin_content: Option, + kill_signal: String, + exit_pump_grace_ms: u64, } impl StreamingRunner { @@ -75,6 +102,8 @@ impl StreamingRunner { cwd: None, env: None, stdin_content: None, + kill_signal: DEFAULT_KILL_SIGNAL.to_string(), + exit_pump_grace_ms: DEFAULT_EXIT_PUMP_GRACE_MS, } } @@ -96,25 +125,54 @@ impl StreamingRunner { self } + /// Configure the signal used to stop the process when it is killed without + /// an explicit signal — i.e. [`OutputStream::kill`] or abandoning the + /// stream. Mirrors the JavaScript `killSignal` option (default `SIGTERM`). + /// + /// The reported exit code follows the conventional `128 + signal` mapping + /// (e.g. `SIGTERM` => 143, `SIGINT` => 130, `SIGKILL` => 137). + pub fn kill_signal(mut self, signal: impl Into) -> Self { + self.kill_signal = signal.into(); + self + } + + /// Configure the grace period (in milliseconds) to keep draining the stdio + /// pipes after the process exits before aborting lingering readers. Mirrors + /// the JavaScript `exitPumpGrace` option (default 100ms). + pub fn exit_pump_grace_ms(mut self, ms: u64) -> Self { + self.exit_pump_grace_ms = ms; + self + } + /// Start the process and return a stream of output chunks pub fn stream(mut self) -> OutputStream { let (tx, rx) = mpsc::channel(1024); + // Unbounded so a synchronous Drop can request a kill without awaiting. + let (kill_tx, kill_rx) = mpsc::unbounded_channel::(); // Spawn the process handling task let command = self.command.clone(); let cwd = self.cwd.take(); let env = self.env.take(); let stdin_content = self.stdin_content.take(); + let grace = self.exit_pump_grace_ms; + let kill_signal = self.kill_signal.clone(); tokio::spawn(async move { if let Err(e) = - run_streaming_process(command, cwd, env, stdin_content, tx.clone()).await + run_streaming_process(command, cwd, env, stdin_content, grace, tx.clone(), kill_rx) + .await { trace_lazy("StreamingRunner", || format!("Error: {}", e)); } }); - OutputStream { rx } + OutputStream { + rx, + kill_tx, + kill_signal, + killed: false, + } } /// Run to completion and collect all output @@ -143,6 +201,9 @@ impl StreamingRunner { /// Stream of output chunks from a process pub struct OutputStream { rx: mpsc::Receiver, + kill_tx: mpsc::UnboundedSender, + kill_signal: String, + killed: bool, } impl OutputStream { @@ -151,6 +212,29 @@ impl OutputStream { self.rx.recv().await } + /// Stop the process using the configured kill signal (default `SIGTERM`). + /// + /// This can be called from inside the consumption loop to stop a + /// long-running or endless process; a terminating `OutputChunk::Exit` is + /// still delivered afterwards. + pub fn kill(&mut self) { + let signal = self.kill_signal.clone(); + self.kill_with(&signal); + } + + /// Stop the process using an explicit signal, overriding the configured + /// kill signal for this call. + pub fn kill_with(&mut self, signal: &str) { + if self.killed { + return; + } + self.killed = true; + trace_lazy("OutputStream", || format!("kill | signal={}", signal)); + // Best effort: the task may have already finished, in which case the + // receiver is gone and the send fails harmlessly. + let _ = self.kill_tx.send(signal.to_string()); + } + /// Collect all remaining output into vectors pub async fn collect(mut self) -> (Vec, Vec, i32) { let mut stdout = Vec::new(); @@ -182,13 +266,26 @@ impl OutputStream { } } +impl Drop for OutputStream { + fn drop(&mut self) { + // Abandoning the stream (e.g. `break`-ing out of the loop) must stop the + // process, matching the JavaScript iterator's `finally` cleanup. If the + // process already finished this is a harmless no-op. + if !self.killed { + let _ = self.kill_tx.send(self.kill_signal.clone()); + } + } +} + /// Run a streaming process and send output to the channel async fn run_streaming_process( command: String, cwd: Option, env: Option>, stdin_content: Option, + exit_pump_grace_ms: u64, tx: mpsc::Sender, + mut kill_rx: mpsc::UnboundedReceiver, ) -> Result<()> { trace_lazy("StreamingRunner", || format!("Starting: {}", command)); @@ -208,6 +305,11 @@ async fn run_streaming_process( cmd.stdout(Stdio::piped()); cmd.stderr(Stdio::piped()); + // Run the child in its own process group so we can signal the whole group + // (parent + grandchildren), matching the JavaScript implementation. + #[cfg(unix)] + cmd.process_group(0); + // Set working directory if let Some(ref cwd) = cwd { cmd.current_dir(cwd); @@ -235,8 +337,8 @@ async fn run_streaming_process( // Spawn stdout reader let stdout = child.stdout.take(); let tx_stdout = tx.clone(); - let stdout_handle = if let Some(stdout) = stdout { - Some(tokio::spawn(async move { + let stdout_handle = stdout.map(|stdout| { + tokio::spawn(async move { let mut reader = BufReader::new(stdout); let mut buf = vec![0u8; 8192]; loop { @@ -255,16 +357,14 @@ async fn run_streaming_process( Err(_) => break, } } - })) - } else { - None - }; + }) + }); // Spawn stderr reader let stderr = child.stderr.take(); let tx_stderr = tx.clone(); - let stderr_handle = if let Some(stderr) = stderr { - Some(tokio::spawn(async move { + let stderr_handle = stderr.map(|stderr| { + tokio::spawn(async move { let mut reader = BufReader::new(stderr); let mut buf = vec![0u8; 8192]; loop { @@ -283,24 +383,70 @@ async fn run_streaming_process( Err(_) => break, } } - })) - } else { - None - }; - - // Wait for readers to complete - if let Some(handle) = stdout_handle { - let _ = handle.await; - } - if let Some(handle) = stderr_handle { - let _ = handle.await; + }) + }); + + // Wait for the process to exit OR for a kill request — crucially we do NOT + // wait for the readers first. If a grandchild keeps the pipe open the + // readers would never finish, so waiting on them before the exit (as the + // old implementation did) would hang forever (issue #155). + let pid = child.id(); + let code; + tokio::select! { + status = child.wait() => { + code = status_to_code(status?); + } + maybe_signal = kill_rx.recv() => { + // A kill was requested (explicit kill()/kill_with() or the stream + // being dropped). Stop the process group with the requested signal. + let signal = maybe_signal.unwrap_or_else(|| DEFAULT_KILL_SIGNAL.to_string()); + trace_lazy("StreamingRunner", || format!("Kill requested | signal={}", signal)); + if let Some(pid) = pid { + send_signal_to_process(pid, &signal); + } + // Give it a brief moment to exit on the requested signal, then + // escalate to a forceful kill so it always terminates. + if tokio::time::timeout(Duration::from_millis(exit_pump_grace_ms), child.wait()) + .await + .is_err() + { + let _ = child.start_kill(); + let _ = child.wait().await; + } + // Report the conventional 128 + signal code for the requested + // signal, matching the JavaScript implementation. + code = 128 + signal_number(&signal); + } } - // Wait for process to exit - let status = child.wait().await?; - let code = status.code().unwrap_or(-1); + // The process has exited. Give the readers a short grace period to flush any + // buffered output, then abort any that are still blocked on an inherited + // open pipe so we don't hang. + let stdout_abort = stdout_handle.as_ref().map(|h| h.abort_handle()); + let stderr_abort = stderr_handle.as_ref().map(|h| h.abort_handle()); + let drain = async { + if let Some(handle) = stdout_handle { + let _ = handle.await; + } + if let Some(handle) = stderr_handle { + let _ = handle.await; + } + }; + if tokio::time::timeout(Duration::from_millis(exit_pump_grace_ms), drain) + .await + .is_err() + { + // A reader is still blocked on an inherited open pipe — abort it so the + // exit chunk is delivered without waiting for the grandchild. + if let Some(abort) = stdout_abort { + abort.abort(); + } + if let Some(abort) = stderr_abort { + abort.abort(); + } + } - // Send exit code + // Send exit code (always — even if a reader was aborted). let _ = tx.send(OutputChunk::Exit(code)).await; trace_lazy("StreamingRunner", || format!("Exited with code: {}", code)); @@ -308,6 +454,65 @@ async fn run_streaming_process( Ok(()) } +/// Convert an exit status into a numeric exit code, using the conventional +/// `128 + signal` mapping when the process was terminated by a signal. +fn status_to_code(status: std::process::ExitStatus) -> i32 { + if let Some(code) = status.code() { + return code; + } + #[cfg(unix)] + { + use std::os::unix::process::ExitStatusExt; + if let Some(sig) = status.signal() { + return 128 + sig; + } + } + -1 +} + +/// Map a signal name to its numeric value for the `128 + signal` exit-code +/// convention. Unknown names fall back to `SIGTERM`. +fn signal_number(signal: &str) -> i32 { + match signal { + "SIGHUP" => 1, + "SIGINT" => 2, + "SIGQUIT" => 3, + "SIGKILL" => 9, + "SIGUSR1" => 10, + "SIGUSR2" => 12, + "SIGTERM" => 15, + _ => 15, + } +} + +/// Send a signal to a process and its process group (best effort). +#[cfg(unix)] +fn send_signal_to_process(pid: u32, signal: &str) { + use nix::sys::signal::{kill, Signal}; + use nix::unistd::Pid; + + let sig = match signal { + "SIGHUP" => Signal::SIGHUP, + "SIGINT" => Signal::SIGINT, + "SIGQUIT" => Signal::SIGQUIT, + "SIGKILL" => Signal::SIGKILL, + "SIGUSR1" => Signal::SIGUSR1, + "SIGUSR2" => Signal::SIGUSR2, + "SIGTERM" => Signal::SIGTERM, + _ => Signal::SIGTERM, + }; + + // Signal the process itself. + let _ = kill(Pid::from_raw(pid as i32), sig); + // Signal the whole process group (negative pid) to reach grandchildren. + let _ = kill(Pid::from_raw(-(pid as i32)), sig); +} + +/// On non-Unix platforms there is no signal delivery; the forceful +/// `start_kill()` escalation in the caller handles termination. +#[cfg(not(unix))] +fn send_signal_to_process(_pid: u32, _signal: &str) {} + /// Shell configuration #[derive(Debug, Clone)] struct ShellConfig { diff --git a/rust/tests/stream.rs b/rust/tests/stream.rs index d78a344..c0f8222 100644 --- a/rust/tests/stream.rs +++ b/rust/tests/stream.rs @@ -91,6 +91,159 @@ async fn test_streaming_runner_cwd() { ); } +// --- Regression tests for issue #155 parity with the JavaScript implementation --- + +/// The stream must yield an explicit Exit chunk as the final chunk. +#[tokio::test] +async fn test_stream_yields_exit_chunk_last() { + let runner = StreamingRunner::new("echo hello"); + let mut stream = runner.stream(); + + let mut chunks = Vec::new(); + while let Some(chunk) = stream.next().await { + chunks.push(chunk); + } + + assert!(!chunks.is_empty()); + let last = chunks.last().unwrap(); + matches!(last, OutputChunk::Exit(0)); + if let OutputChunk::Exit(code) = last { + assert_eq!(*code, 0); + } else { + panic!("last chunk must be an Exit chunk, got {:?}", last); + } +} + +/// The stream must not hang when a grandchild keeps the stdout pipe open after +/// the immediate child has exited. +#[cfg(unix)] +#[tokio::test] +async fn test_stream_does_not_hang_on_open_pipe() { + use std::time::Instant; + + let start = Instant::now(); + // `sh` exits immediately after `echo done`, but the backgrounded `sleep` + // inherits the stdout pipe and keeps it open. + let runner = StreamingRunner::new("sh -c 'sleep 5 & echo done'"); + let mut stream = runner.stream(); + + let mut saw_stdout = false; + let mut saw_exit = false; + while let Some(chunk) = stream.next().await { + match chunk { + OutputChunk::Stdout(_) => saw_stdout = true, + OutputChunk::Exit(_) => saw_exit = true, + _ => {} + } + } + let elapsed = start.elapsed(); + + assert!(saw_stdout, "expected stdout output"); + assert!(saw_exit, "expected an exit chunk"); + // Must terminate quickly (grace ~100ms) rather than waiting for the sleep. + assert!( + elapsed.as_secs() < 5, + "stream hung for {:?}; expected prompt termination", + elapsed + ); +} + +/// The process can be stopped from inside the loop with kill(), and the +/// reported exit code follows the configured kill signal. +#[cfg(unix)] +#[tokio::test] +async fn test_stream_kill_from_loop_honors_kill_signal() { + use std::time::Instant; + + let start = Instant::now(); + // Endless producer. + let runner = StreamingRunner::new( + "sh -c 'i=0; while true; do i=$((i+1)); echo line-$i; sleep 0.05; done'", + ) + .kill_signal("SIGINT"); + let mut stream = runner.stream(); + + let mut stdout_count = 0; + let mut exit_code = None; + while let Some(chunk) = stream.next().await { + match chunk { + OutputChunk::Stdout(_) => { + stdout_count += 1; + if stdout_count >= 3 { + stream.kill(); // configured signal => SIGINT + } + } + OutputChunk::Exit(code) => exit_code = Some(code), + _ => {} + } + } + let elapsed = start.elapsed(); + + assert!(stdout_count >= 3); + assert!(elapsed.as_secs() < 10, "loop did not stop promptly"); + // 128 + SIGINT(2) = 130 + assert_eq!(exit_code, Some(130)); +} + +/// An explicit kill_with(signal) overrides the configured kill signal. +#[cfg(unix)] +#[tokio::test] +async fn test_stream_kill_with_overrides_configured_signal() { + let runner = + StreamingRunner::new("sh -c 'i=0; while true; do i=$((i+1)); echo k-$i; sleep 0.05; done'") + .kill_signal("SIGINT"); + let mut stream = runner.stream(); + + let mut stdout_count = 0; + let mut exit_code = None; + while let Some(chunk) = stream.next().await { + match chunk { + OutputChunk::Stdout(_) => { + stdout_count += 1; + if stdout_count >= 3 { + stream.kill_with("SIGKILL"); + } + } + OutputChunk::Exit(code) => exit_code = Some(code), + _ => {} + } + } + + // 128 + SIGKILL(9) = 137 + assert_eq!(exit_code, Some(137)); +} + +/// Abandoning the stream (dropping it, e.g. after `break`) stops the process. +#[cfg(unix)] +#[tokio::test] +async fn test_stream_break_stops_process() { + use std::time::Instant; + + let start = Instant::now(); + let runner = + StreamingRunner::new("sh -c 'i=0; while true; do i=$((i+1)); echo b-$i; sleep 0.05; done'"); + let mut stream = runner.stream(); + + let mut count = 0; + while let Some(chunk) = stream.next().await { + if let OutputChunk::Stdout(_) = chunk { + count += 1; + if count >= 3 { + break; // dropping the stream must terminate the process + } + } + } + // Drop the stream explicitly to trigger the kill. + drop(stream); + let elapsed = start.elapsed(); + + assert_eq!(count, 3); + assert!( + elapsed.as_secs() < 10, + "process was not stopped promptly on break" + ); +} + #[tokio::test] async fn test_streaming_runner_env() { use std::collections::HashMap;