Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions .github/scripts/check-language-parity.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/usr/bin/env bash
#
# Language parity check.
#
# command-stream ships two implementations that must stay in lock-step: the
# JavaScript library under js/src/** and the Rust library under rust/src/**.
# This script fails when a pull request changes one language's source without
# touching the other's, so that behavioral changes are always made in both
# languages (see issue #155 review feedback).
#
# Escape hatch: add the `parity-exempt` label to the PR for changes that are
# legitimately single-language (the workflow skips this check when the label is
# present).
#
# Environment:
# BASE_REF - the base branch to diff against (default: main). In GitHub
# Actions this is github.base_ref.
#
# Usage (locally):
# BASE_REF=main bash .github/scripts/check-language-parity.sh
set -euo pipefail

BASE_REF="${BASE_REF:-main}"

# Make sure the base branch is available locally, then resolve a ref we can diff
# against. Prefer the remote-tracking ref; fall back to the bare branch name.
git fetch --no-tags origin "${BASE_REF}" >/dev/null 2>&1 || true
if git rev-parse --verify --quiet "origin/${BASE_REF}" >/dev/null; then
BASE="origin/${BASE_REF}"
elif git rev-parse --verify --quiet "${BASE_REF}" >/dev/null; then
BASE="${BASE_REF}"
else
echo "::warning::Could not resolve base ref '${BASE_REF}'; skipping parity check."
exit 0
fi

MERGE_BASE="$(git merge-base "${BASE}" HEAD 2>/dev/null || echo "${BASE}")"
CHANGED="$(git diff --name-only "${MERGE_BASE}" HEAD)"

echo "Comparing against ${BASE} (merge-base ${MERGE_BASE})"
echo "Changed files:"
echo "${CHANGED}" | sed 's/^/ /'

js_changed=false
rust_changed=false
while IFS= read -r f; do
[ -z "${f}" ] && continue
case "${f}" in
js/src/*) js_changed=true ;;
rust/src/*) rust_changed=true ;;
esac
done <<EOF
${CHANGED}
EOF

echo "js/src changed: ${js_changed}"
echo "rust/src changed: ${rust_changed}"

if [ "${js_changed}" = "true" ] && [ "${rust_changed}" != "true" ]; then
echo "::error::JavaScript source (js/src/**) changed but Rust source (rust/src/**) did not."
echo "command-stream keeps the JavaScript and Rust implementations in parity."
echo "Please make the equivalent change under rust/src/**, or add the"
echo "'parity-exempt' label to this PR if the change is intentionally JS-only."
exit 1
fi

if [ "${rust_changed}" = "true" ] && [ "${js_changed}" != "true" ]; then
echo "::error::Rust source (rust/src/**) changed but JavaScript source (js/src/**) did not."
echo "command-stream keeps the JavaScript and Rust implementations in parity."
echo "Please make the equivalent change under js/src/**, or add the"
echo "'parity-exempt' label to this PR if the change is intentionally Rust-only."
exit 1
fi

echo "Language parity check passed."
37 changes: 37 additions & 0 deletions .github/workflows/parity.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
name: Language parity check

# Ensure behavioral changes are made in both the JavaScript (js/src/**) and the
# Rust (rust/src/**) implementations. A PR that changes one without the other
# fails this check unless it carries the `parity-exempt` label.
#
# See issue #155 review feedback: "double check that all features that are
# supported in JavaScript are fully supported in Rust and we have CI/CD rules,
# that check that we do changes in both languages always".

on:
pull_request:
types: [opened, synchronize, reopened, labeled, unlabeled]

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

permissions:
contents: read

jobs:
parity:
name: JS/Rust source parity
runs-on: ubuntu-latest
timeout-minutes: 10
# Skip entirely when the PR is explicitly marked as a single-language change.
if: ${{ !contains(github.event.pull_request.labels.*.name, 'parity-exempt') }}
steps:
- uses: actions/checkout@v6
with:
fetch-depth: 0

- name: Check JavaScript/Rust source parity
env:
BASE_REF: ${{ github.base_ref }}
run: bash .github/scripts/check-language-parity.sh
27 changes: 27 additions & 0 deletions js/.changeset/issue-155-stream-exit-chunks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
---
'command-stream': minor
---

Fix `stream()` async iterator to yield exit chunks and never hang on open pipes (issue #155)

- `stream()` now yields a final `{ type: 'exit', code }` chunk when the process
exits, so the documented `chunk.type === 'exit'` handling is no longer dead
code. Consumers that touch `chunk.data` must guard on `chunk.type` first.
- Both `stream()` and awaiting a command no longer hang forever when the process
has exited but a grandchild keeps the stdio pipes open (e.g.
`sh -c 'long-task & echo done'`). The command resolves as soon as the process
exits; remaining buffered output is drained within a short grace period before
the lingering reads are aborted.
- The grace period is configurable via the `exitPumpGrace` option (milliseconds,
default `100`). For ordinary commands the pumps drain immediately, so the grace
adds no latency — it only bounds the wait in the grandchild-holds-pipe case.
- A long-running command can be stopped from inside the `stream()` loop, either by
calling `kill()` (the loop then ends with a terminating `exit` chunk) or by
`break`ing out of the loop (which kills the process as the iterator unwinds).
- The stop signal is configurable via the new `killSignal` option (default
`SIGTERM`). An argument-less `kill()`, a `break`, and an external `AbortSignal`
all use it; an explicit `kill(signal)` argument still overrides it. Exit codes
follow the conventional `128 + signal` mapping (e.g. `SIGINT` => 130).
- Awaiting a command while an external `AbortSignal` fires no longer hangs: the
abort listener is now registered on the await/then path too, so the command
resolves promptly with the configured signal's exit code.
75 changes: 75 additions & 0 deletions js/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,83 @@ import { $ } from 'command-stream';
for await (const chunk of $`long-running-command`.stream()) {
if (chunk.type === 'stdout') {
console.log('Real-time output:', chunk.data.toString());
} else if (chunk.type === 'exit') {
console.log('Process exited with code:', chunk.code);
}
}
```

`stream()` yields `{ type: 'stdout' | 'stderr', data: Buffer }` chunks as output
arrives, followed by a final `{ type: 'exit', code }` chunk when the process
exits. Always guard on `chunk.type` before reading `chunk.data`, since the
`exit` chunk carries `code` instead of `data`.

The iterator terminates as soon as the process exits, even if a grandchild keeps
the stdout/stderr pipe open (e.g. `sh -c 'background-task & echo done'`). Any
output still buffered is drained within a short grace period (the `exitPumpGrace`
option, default `100`ms) before the lingering reads are aborted, so the loop
never hangs waiting on a pipe the command itself is no longer using.

#### Stopping the process from inside the loop

You can stop a long-running command while iterating over it — either by calling
`kill()` on the command, or simply by `break`ing out of the loop (which kills the
process automatically as the iterator is cleaned up):

```javascript
const cmd = $`some-endless-stream`;

for await (const chunk of cmd.stream()) {
if (chunk.type === 'stdout') {
console.log(chunk.data.toString());
if (seenEnoughOutput(chunk)) {
cmd.kill(); // stops the process; the loop then ends with an exit chunk
}
} else if (chunk.type === 'exit') {
console.log('stopped with code', chunk.code); // 143 for the SIGTERM from kill()
}
}

// Or just break — the process is terminated as the loop unwinds:
for await (const chunk of $`some-endless-stream`.stream()) {
if (chunk.type === 'stdout' && done(chunk)) break;
}
```

##### Choosing the stop signal

`kill()` defaults to `SIGTERM`, but you can stop with any signal. Pass it
explicitly, or configure a default via the `killSignal` option so that an
argument-less `kill()`, a `break`, or an `AbortSignal` all use it:

```javascript
// Explicit per-call signal:
cmd.kill('SIGINT'); // exit code 130

// Configured default — used by kill(), break, and AbortSignal cancellation:
const cmd = $({ killSignal: 'SIGINT' })`some-endless-stream`;
for await (const chunk of cmd.stream()) {
if (chunk.type === 'stdout' && done(chunk))
cmd.kill(); // sends SIGINT
else if (chunk.type === 'exit') console.log(chunk.code); // 130
}

// AbortSignal style also honors killSignal — awaiting resolves promptly when
// the signal fires (it does not hang) with the configured signal's exit code:
const ac = new AbortController();
const running = $({
signal: ac.signal,
killSignal: 'SIGINT',
})`some-endless-stream`;
setTimeout(() => ac.abort(), 1000); // stops with SIGINT
const result = await running;
console.log(result.code); // 130
```

command-stream still escalates to `SIGKILL` after delivering the chosen signal
so a process that ignores it is guaranteed to terminate; the reported exit code
reflects the signal you configured.

### EventEmitter Pattern (Event-driven)

```javascript
Expand Down Expand Up @@ -912,6 +985,8 @@ The enhanced `$` function returns a `ProcessRunner` instance that extends `Event
- `interactive: boolean` - Enable TTY forwarding for interactive commands (requires `stdin: 'inherit'` and TTY environment)
- `cwd: string` - Working directory for command
- `env: object` - Environment variables
- `exitPumpGrace: number` - Milliseconds to wait for buffered output to drain after the process exits before aborting stdio reads held open by a grandchild (default `100`; see [Async Iteration](#async-iteration-real-time-streaming))
- `killSignal: string` - Signal used to stop the process when it is killed without an explicit signal — i.e. `kill()` with no argument, `break`ing out of a `stream()` loop, or an external `AbortSignal` firing (default `'SIGTERM'`). An explicit `kill(signal)` argument always overrides this. The reported exit code follows the conventional `128 + signal` mapping (e.g. `SIGTERM` → 143, `SIGINT` → 130, `SIGKILL` → 137)

**Override defaults:**

Expand Down
49 changes: 49 additions & 0 deletions js/examples/stream-exit-chunks.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env node
// Demonstrates issue #155 fixes for the stream() async iterator:
// 1. stream() yields a final { type: 'exit', code } chunk on process exit.
// 2. stream() (and awaiting a command) no longer hangs when the process has
// exited but a grandchild keeps the stdout pipe open.
import { $ } from '../src/$.mjs';

// 1. Observe the exit code from within the async iterator.
console.log('--- exit chunk ---');
for await (const chunk of $({
mirror: false,
})`sh -c 'echo hello; exit 3'`.stream()) {
if (chunk.type === 'exit') {
console.log('exit chunk, code =', chunk.code);
} else {
console.log(chunk.type, '=>', chunk.data.toString().trim());
}
}

// 2. The backgrounded `sleep` inherits stdout and keeps it open, but the
// iterator still terminates promptly once `sh` itself exits.
console.log('--- no hang with a lingering grandchild ---');
const start = Date.now();
for await (const chunk of $({
mirror: false,
})`sh -c 'sleep 30 & echo done'`.stream()) {
if (chunk.type === 'exit') {
console.log(`finished in ${Date.now() - start}ms with code ${chunk.code}`);
} else {
console.log(chunk.type, '=>', chunk.data.toString().trim());
}
}

// 3. Stop a long-running command from inside the loop by calling kill().
console.log('--- stop from inside the loop ---');
const endless = $({
mirror: false,
})`sh -c 'i=0; while true; do i=$((i+1)); echo tick-$i; sleep 0.1; done'`;
let ticks = 0;
for await (const chunk of endless.stream()) {
if (chunk.type === 'stdout') {
console.log('stdout =>', chunk.data.toString().trim());
if (++ticks >= 3) {
endless.kill(); // ends the loop with an exit chunk
}
} else if (chunk.type === 'exit') {
console.log('stopped with code', chunk.code);
}
}
48 changes: 48 additions & 0 deletions js/experiments/bun-abort-iter.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
const start = Date.now();
const child = Bun.spawn(['sh', '-c', 'sleep 3 & echo done'], {
stdin: 'pipe',
stdout: 'pipe',
stderr: 'pipe',
detached: true,
});
const iterator = child.stdout[Symbol.asyncIterator]();
let aborted = false;
let abortResolve;
const abortP = new Promise((r) => (abortResolve = r));
setTimeout(() => {
aborted = true;
abortResolve({ aborted: true });
console.log('aborting at', Date.now() - start);
}, 150);
(async () => {
try {
while (true) {
const res = await Promise.race([iterator.next(), abortP]);
if (res.aborted) {
console.log('abort detected at', Date.now() - start);
break;
}
if (res.done) {
console.log('done at', Date.now() - start);
break;
}
console.log(
'chunk at',
Date.now() - start,
new TextDecoder().decode(res.value).trim()
);
}
} finally {
if (iterator.return) {
await iterator
.return()
.catch((e) => console.log('return err', e.message));
console.log('iterator.return done at', Date.now() - start);
}
}
console.log('LOOP EXITED at', Date.now() - start);
})();
setTimeout(() => {
console.log('--- 4s, exit');
process.exit(0);
}, 4000);
30 changes: 30 additions & 0 deletions js/experiments/bun-exit-timing.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
const start = Date.now();
const child = Bun.spawn(['sh', '-c', 'sleep 3 & echo done'], {
stdin: 'pipe',
stdout: 'pipe',
stderr: 'pipe',
detached: true,
});
child.exited.then((code) =>
console.log('child.exited resolved at', Date.now() - start, 'code', code)
);
// read stdout
const reader = child.stdout.getReader();
(async () => {
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('stdout EOF at', Date.now() - start);
break;
}
console.log(
'stdout data at',
Date.now() - start,
new TextDecoder().decode(value).trim()
);
}
})();
setTimeout(() => {
console.log('--- 4s mark, exiting');
process.exit(0);
}, 4000);
Loading
Loading