Skip to content

fix(stream): yield exit chunks and prevent hang on open pipes (#155)#169

Merged
konard merged 11 commits into
mainfrom
issue-155-b0a2bbf9a2bc
Jun 10, 2026
Merged

fix(stream): yield exit chunks and prevent hang on open pipes (#155)#169
konard merged 11 commits into
mainfrom
issue-155-b0a2bbf9a2bc

Conversation

@konard

@konard konard commented Jun 9, 2026

Copy link
Copy Markdown
Member

Summary

Fixes #155. The stream() async iterator had two problems:

  1. No exit chunksstream() only yielded {type:'stdout'|'stderr', data} chunks. The documented chunk.type === 'exit' handling was dead code, so the exit code could only be read from (await cmd).code after the loop.
  2. Hang on open pipesfinish() (which emits 'end', ending the iterator) was gated behind Promise.all([outPump, errPump, ...]). If a grandchild inherited the stdout pipe and kept it open (e.g. sh -c 'task & echo done'), the pump never finished and both stream() and await hung forever.

Changes

  • Yield exit chunks: stream() now pushes a final { type: 'exit', code } chunk when the process exits (emitted by finish() right after 'end'), so the documented pattern works.
  • Exit detection independent of pipes (src/$.process-runner-exit.mjs, extracted from execution):
    • createExitPromise() resolves on the child 'exit' event (not 'close', which waits for all stdio to close) — and also on cancellation, so a kill that removes child listeners doesn't hang the awaiting caller. Exit detection is immediate and event-driven.
    • drainPumpsAfterExit() waits a short grace period for buffered output to drain after exit, then aborts lingering stdio reads.
  • Cancellable pumps: pumpReadable() accepts an AbortSignal so a pending read() (Web/Bun ReadableStream via an explicit reader, or a Node Readable via destroy()) can be interrupted when pipes stay open.
  • exitPumpGrace option (default 100ms) configures the grace period.
  • Stop from inside the loop (review feedback): a long-running command can be stopped while iterating, either by calling kill() (the loop ends with a terminating exit chunk) or by breaking out of the loop (the process is killed as the iterator unwinds). Documented + tested.
  • Docs (README), an example, and a changeset (minor).

On the exitPumpGrace grace period (review question)

Exit detection is immediate — we subscribe to the child 'exit' event. The grace period is a separate, bounded wait to drain output the OS pipe still holds after the process exits (the 'exit' event and the final stdout 'data'/EOF are independent, unordered events; aborting the pumps the instant 'exit' fires could truncate the last line).

  • Normal commands: zero added latency. The pumps reach EOF within a microtask of exit and we return immediately — echo quick through stream() completes in 0–1ms; the 100ms timer never elapses.
  • The grace only applies in the pathological case — a grandchild holds the pipe open forever. There EOF never comes, so a timeout is the only way to stop. Set exitPumpGrace: 0 for strict immediacy (note: racy — can truncate the trailing chunk).

Reproduction

import { $ } from 'command-stream';

// Before: prints "stdout done", then hangs forever on the lingering `sleep`.
// After:  terminates in ~100ms and yields an exit chunk.
for await (const chunk of $`sh -c 'sleep 30 & echo done'`.stream()) {
  if (chunk.type === 'exit') console.log('exit code:', chunk.code);
  else console.log(chunk.type, chunk.data.toString().trim());
}

Stopping from inside the loop:

const cmd = $`some-endless-stream`;
for await (const chunk of cmd.stream()) {
  if (chunk.type === 'stdout' && enough(chunk)) cmd.kill();  // ends loop with an exit chunk
  else if (chunk.type === 'exit') console.log('stopped with code', chunk.code); // 143
}

Run the bundled demo: bun js/examples/stream-exit-chunks.mjs

Tests

js/tests/stream-exit-chunks.test.mjs (all passing under Bun and Node):

  • exit chunk on success (code 0, is the last chunk)
  • exit chunk reports a non-zero code (7)
  • stream() does not hang when a grandchild keeps stdout open (elapsed < 10s)
  • await does not hang in the same scenario
  • kill() inside the loop stops promptly and yields a terminating exit chunk
  • break inside the loop terminates the process
  • a normal command's exit chunk arrives with zero added latency

The full suite was run before and after: the only remaining failures are pre-existing and environmental (no jq binary installed locally; CI installs jq and they pass) — identical with and without this change.

Compatibility note

Yielding exit chunks is a behavior change: consumers that read chunk.data inside a stream() loop must now guard on chunk.type first (the exit chunk carries code, not data). The README's stream() examples have always guarded on chunk.type, so idiomatic consumers are unaffected. Hence the minor changeset.

Fixes #155


Review follow-up (2026-06-09)

Addresses the three asks in the latest review.

1. Configurable stop signal

The signal used to stop a process is now configurable instead of hardcoded SIGTERM.

  • JavaScript: new killSignal option (default 'SIGTERM'). It is honored by an
    argument-less kill(), by a break out of the loop, and by an external
    AbortSignal. An explicit kill(signal) argument still overrides it. Exit codes
    follow the conventional 128 + signal mapping (e.g. SIGINT => 130, SIGKILL => 137).

    const cmd = $({ killSignal: 'SIGINT' })`some-endless-stream`;
    for await (const chunk of cmd.stream()) {
      if (chunk.type === 'stdout' && enough(chunk)) cmd.kill(); // stops with SIGINT
      else if (chunk.type === 'exit') console.log(chunk.code);  // 130
    }
  • Also fixed: awaiting a command while an external AbortSignal fires no longer hangs.
    The abort listener was only registered on the start() path; it is now registered on
    the await/then() path too (idempotently), so await running resolves promptly with
    the configured signal's exit code.

2. Rust parity

The same stream features now exist in the Rust implementation (rust/src/stream.rs),
which already exposes an async-iterator-style OutputStream (stream.next().await):

  • A terminating OutputChunk::Exit(code) is always yielded when the process exits.
  • It never hangs when a grandchild keeps the stdio pipes open — the child wait is
    independent of the readers (tokio::select!), readers are drained for
    exit_pump_grace_ms (default 100ms, configurable) and then aborted.
  • The process can be stopped from inside the loop: stream.kill() (configured signal)
    or stream.kill_with(signal) (explicit override); dropping the stream (break) stops
    it too (impl Drop).
  • The stop signal is configurable via StreamingRunner::kill_signal(..) (default
    SIGTERM), mirroring the JS killSignal option, with the same 128 + signal exit codes.

Covered by rust/tests/stream.rs (exit-chunk-last, no-hang-on-open-pipe, kill-honors-signal,
kill_with-override, break-stops-process) and a rust/changelog.d fragment.

3. CI/CD enforces dual-language changes

New Language parity check workflow (.github/workflows/parity.yml +
.github/scripts/check-language-parity.sh): a PR that changes js/src/** without
rust/src/** (or vice versa) fails the check, so behavioral changes are always made in
both languages. Escape hatch: the parity-exempt label skips the check for legitimately
single-language changes.

Adding .gitkeep for PR creation (default mode).
This file will be removed when the task is complete.

Issue: #155
@konard konard self-assigned this Jun 9, 2026
konard added 3 commits June 9, 2026 19:59
stream() now yields a final { type: 'exit', code } chunk when the
process exits, and neither stream() nor awaiting a command hangs when a
grandchild keeps the stdio pipes open.

- Add onExit handler to stream() that pushes { type: 'exit', code }.
- Extract exit detection + pump draining into $.process-runner-exit.mjs:
  createExitPromise resolves on the child 'exit' event (not 'close') or
  on cancellation, and drainPumpsAfterExit waits a short grace period for
  buffered output before aborting lingering stdio reads.
- pumpReadable accepts an AbortSignal so a pending read can be cancelled
  when pipes stay open after exit.
- exitPumpGrace option (default 100ms) configures the grace period.
- Tidy quote.mjs lint warnings touched by the pump change.
Add stream-exit-chunks.test.mjs verifying exit chunks (success and
non-zero code) and that stream()/await do not hang when a grandchild
keeps stdout open. Update the virtual async-generator test to guard on
chunk.type now that exit chunks are yielded.
…155)

- README: document the { type: 'exit', code } chunk, the no-hang
  guarantee, and the exitPumpGrace option.
- Add examples/stream-exit-chunks.mjs demonstrating both fixes.
- Add changeset (minor) and the experiment scripts used to diagnose #155.
@konard konard changed the title [WIP] stream() async iterator does not yield exit chunks and can hang if process stdout stays open fix(stream): yield exit chunks and prevent hang on open pipes (#155) Jun 9, 2026
@konard konard marked this pull request as ready for review June 9, 2026 20:02
@konard

konard commented Jun 9, 2026

Copy link
Copy Markdown
Member Author

Working session summary

CI passed. Everything is complete.

Summary

Issue #155 is fully resolved and PR #169 is ready for review.

What was fixed:

  1. stream() now yields exit chunks — a final { type: 'exit', code } chunk is pushed when the process exits, so the documented (previously dead) chunk.type === 'exit' handling works.
  2. No more hang on open pipes — exit detection no longer waits on Promise.all of the stdio pumps. A new createExitPromise() (in src/$.process-runner-exit.mjs) resolves on the child 'exit' event (and on cancellation), and drainPumpsAfterExit() waits a short grace period (configurable via exitPumpGrace, default 100ms) before aborting lingering reads via an AbortSignal threaded through pumpReadable().

Final state:

PR: #169


This summary was automatically extracted from the AI working session output.

@konard

konard commented Jun 9, 2026

Copy link
Copy Markdown
Member Author

🤖 Solution Draft Log

This log file contains the complete execution trace of the AI solution draft process.

💰 Cost: $11.468476

📊 Context and tokens usage:

Claude Opus 4.8: (3 sub-sessions)

  1. 116.8K / 1M (12%) input tokens, 53.1K / 128K (42%) output tokens
  2. 116.9K / 1M (12%) input tokens, 38.2K / 128K (30%) output tokens
  3. 35.7K / 1M (4%) input tokens, 1.1K / 128K (1%) output tokens

Total: (36.3K new + 236.3K cache writes + 14.2M cache reads) input tokens, 107.6K output tokens, $11.468476 cost

🤖 Models used:

  • Tool: Anthropic Claude Code
  • Requested: opus
  • Model: Claude Opus 4.8 (claude-opus-4-8)

📎 Log file uploaded as Gist (5428KB)


Now working session is ended, feel free to review and add any feedback on the solution draft.

@konard

konard commented Jun 9, 2026

Copy link
Copy Markdown
Member Author

✅ Ready to merge

This pull request is now ready to be merged:

  • All CI checks have passed
  • No merge conflicts
  • No pending changes

Monitored by hive-mind with --auto-restart-until-mergeable flag

@konard

konard commented Jun 9, 2026

Copy link
Copy Markdown
Member Author

Double check there will be no breaking changes, and everything will work in the best possible way. If that breaks previously documented behavior, we might need to be able to configure it.

Do we really need exitPumpGrace option (default 100ms) configures the grace period.? Cannot we just exit on process exit and finish the loop? Why we cannot do it precisely and immediately? We cannot subscribe to required events or what?

Also inside the loop we should have ability to stop the process. For example we may be can save

$`sh -c 'sleep 30 & echo done'`

to a variable, and access some fields or functions or both for the user to choose. See how competitors do it or what are best practices for perfect API.

We may need to support both AbortSignal style (by giving access to it) and also explicit cancel/stop.

@konard konard marked this pull request as draft June 9, 2026 22:03
@konard

konard commented Jun 9, 2026

Copy link
Copy Markdown
Member Author

🤖 AI Work Session Started

Starting automated work session at 2026-06-09T22:03:41.810Z

The PR has been converted to draft mode while work is in progress.

This comment marks the beginning of an AI work session. Please wait for the session to finish, and provide your feedback.

Address PR review feedback: document and test stopping a long-running
command from inside the stream() loop, and clarify the exitPumpGrace
rationale.

- Add regression tests: kill() inside the loop ends the iterator with a
  terminating exit chunk; break also kills the process; and a normal
  command's exit chunk is yielded with zero added latency (the grace
  only applies when a grandchild holds the pipe open).
- README: document stopping from inside the loop (kill() / break).
- Example: demonstrate kill() inside the loop.
- Changeset: note the stop-in-loop behavior and that grace adds no
  latency for ordinary commands.
@konard

konard commented Jun 9, 2026

Copy link
Copy Markdown
Member Author

Thanks for the review — I dug into all three points. Summary: exit detection is already immediate and event-driven; the grace period is not about detecting exit, only about not truncating the last line of output; and stopping from inside the loop already works (now documented + tested).

1. No breaking changes

The full suite passes with this branch (only the pre-existing jq-not-installed failures remain, identical with/without the change — CI installs jq, so they pass there). None of the existing stream() / pipe / interactive-streaming tests broke from the new exit chunk.

The only behavior change is the additive { type: 'exit', code } chunk. The README's stream() examples have always guarded on chunk.type before reading chunk.data, so correctly-written consumers are unaffected; the only code that breaks is code that assumed every chunk has .data (undocumented). That's why it's a minor bump. If you'd still prefer it behind a flag, I can gate exit-chunk emission on an option (default on) — just say the word.

2. Do we really need exitPumpGrace? Why not exit immediately on process exit?

We do exit immediately on process exit — createExitPromise() resolves on the child 'exit' event (not 'close'), so detection is precise and event-driven, exactly as you suggest. The grace period is a separate concern: it's the bounded wait to drain output the OS pipe still holds after the process exits.

The subtlety is that the child's 'exit' event and the final 'data'/EOF on its stdout are independent, unordered events. If we aborted the pumps the instant 'exit' fires, the last buffered line could be lost. So we let the pumps drain — and crucially:

  • Normal commands: zero added latency. The pumps reach EOF within a microtask of exit, win the race, and we return immediately. Measured: echo quick through stream() completes in 0–1ms; the 100ms timer never elapses.

  • The grace only applies in the pathological case — a grandchild inherited the pipe and keeps it open forever (sh -c 'sleep 5 & echo done'). There, EOF never comes, so a timeout is the only way to stop: we can't distinguish "more output coming" from "idle pipe held by a grandchild." Measured wall-clock in that case by grace setting:

    exitPumpGrace elapsed output captured
    0 ~11ms done ✅ (but racy — can truncate the trailing chunk)
    100 (default) ~106ms done
    200 ~206ms done

So: immediate exit detection ✔, no latency for normal commands ✔, and the grace is a safety margin only for the open-grandchild-pipe edge case. It's already configurable (set exitPumpGrace: 0 for strict immediacy), and the default of 100ms trades ~100ms in that rare case for not truncating output.

3. Ability to stop the process from inside the loop

This works today — I've now documented it and added regression tests. Two ways:

const cmd = $`some-endless-stream`;
for await (const chunk of cmd.stream()) {
  if (chunk.type === 'stdout' && enough(chunk)) {
    cmd.kill();          // loop then ends with an exit chunk (code 143 for SIGTERM)
  } else if (chunk.type === 'exit') {
    console.log('stopped with code', chunk.code);
  }
}

// Or just break — the iterator's cleanup kills the process:
for await (const chunk of $`some-endless-stream`.stream()) {
  if (done(chunk)) break;
}

New tests in stream-exit-chunks.test.mjs:

  • kill() inside the loop stops promptly and still yields a terminating exit chunk
  • break inside the loop terminates the process (cmd.finished === true)
  • a normal command's exit chunk arrives with zero added latency

All passing under both Bun and Node. Pushed in 9643a99.

@konard konard marked this pull request as ready for review June 9, 2026 22:15
@konard

konard commented Jun 9, 2026

Copy link
Copy Markdown
Member Author

🤖 Solution Draft Log

This log file contains the complete execution trace of the AI solution draft process.

💰 Cost: $3.866656

📊 Context and tokens usage:

  • 104.3K / 1M (10%) input tokens, 31.2K / 128K (24%) output tokens

Total: (4.9K new + 91.4K cache writes + 5.0M cache reads) input tokens, 31.2K output tokens, $3.866656 cost

🤖 Models used:

  • Tool: Anthropic Claude Code
  • Requested: opus
  • Model: Claude Opus 4.8 (claude-opus-4-8)

📎 Log file uploaded as Gist (1843KB)


Now working session is ended, feel free to review and add any feedback on the solution draft.

@konard

konard commented Jun 9, 2026

Copy link
Copy Markdown
Member Author

✅ Ready to merge

This pull request is now ready to be merged:

  • All CI checks have passed
  • No merge conflicts
  • No pending changes

Monitored by hive-mind with --auto-restart-until-mergeable flag

@konard

konard commented Jun 9, 2026

Copy link
Copy Markdown
Member Author

Can we configure signals we stop with also? And double check all the same is implemented in Rust syntax (if Rust supports something similar like some async iterators and so on).

And anyway also double check that all features that are supported in JavaScript are fully supported in Rust and we have CI/CD rules, that check that we do changes in both languages always.

@konard konard marked this pull request as draft June 9, 2026 23:01
@konard

konard commented Jun 9, 2026

Copy link
Copy Markdown
Member Author

🤖 AI Work Session Started

Starting automated work session at 2026-06-09T23:01:31.308Z

The PR has been converted to draft mode while work is in progress.

This comment marks the beginning of an AI work session. Please wait for the session to finish, and provide your feedback.

konard added 5 commits June 9, 2026 23:16
- Add a killSignal option (default SIGTERM) used by argument-less kill(),
  break out of a stream() loop, and external AbortSignal cancellation.
- An explicit kill(signal) argument still overrides the configured signal.
- Exit codes follow the conventional 128 + signal mapping (SIGTERM=>143,
  SIGINT=>130, SIGKILL=>137, ...) resolved from os.constants.signals.
- Fix a pre-existing hang: awaiting a command while an external AbortSignal
  fired never resolved because the abort listener was only registered on the
  start({...}) path. Register it (idempotently) in _doStartAsync so the
  await/then path honors options.signal too.
- Tests + README updated for #155.
Bring the Rust StreamingRunner/OutputStream to parity with the JavaScript
stream() implementation:

- Yield an explicit OutputChunk::Exit(code) when the process exits.
- No-hang on open pipes: wait on the child exit independently of the stdio
  readers (the old code awaited the readers first, which hangs when a
  grandchild keeps the pipe open), then drain readers for a grace period and
  abort lingering ones. The exit chunk is always delivered.
- Configurable stop signal via StreamingRunner::kill_signal (default SIGTERM),
  mirroring the JS killSignal option; exit codes use the 128 + signal mapping.
- Stop from inside the loop with OutputStream::kill()/kill_with(signal);
  dropping the stream (break) stops the process too (Drop sends a kill).
- Run the child in its own process group so signals reach grandchildren.

Adds Rust regression tests, a changelog fragment, and README docs.
Add a Language parity check workflow that fails a PR when js/src/** changes
without rust/src/** (or vice versa), so behavioral changes are always made in
both implementations. A 'parity-exempt' label skips the check for intentionally
single-language PRs.

Implements the CI/CD parity enforcement requested in the #155 review.
@konard

konard commented Jun 9, 2026

Copy link
Copy Markdown
Member Author

Working session summary

CI is still running (parity already green). The background poll will notify me when JS and Rust complete. Let me wait for it.

I have completed the substantive work and am now waiting on CI confirmation for the final commit (9a3035a). Here's where things stand:

Done this session (finalization):

  • ✅ Confirmed the 39 local JS test "failures" are identical before and after my changes (pure environment/sandbox issues — no jq/gh/TTY — zero regressions, verified via a baseline worktree at 9643a99).
  • ✅ Caught and fixed two real CI failures my sandbox couldn't see (it has no eslint binary path / different prettier scope):
    • eslint errors in my js/experiments/ repro scripts → 4641554... fixed in 1876f49.
    • prettier format:check on js/README.md examples → fixed in 9a3035a.
  • ✅ All local gates now pass: eslint ., prettier --check ., jscpd ., JS stream tests (11/11), Rust stream tests (13/13), cargo fmt --check, and the parity script.
  • ✅ Updated PR fix(stream): yield exit chunks and prevent hang on open pipes (#155) #169 description with three new "Review follow-up" sections (configurable signal, Rust parity, CI parity rule).
  • ✅ Reviewer reply drafted (ready to post once CI is green).

Remaining: confirm JS + Rust CI go green on 9a3035a, then post the reviewer reply and mark PR ready (gh pr ready 169).


This summary was automatically extracted from the AI working session output.

@konard konard marked this pull request as ready for review June 9, 2026 23:39
@konard

konard commented Jun 9, 2026

Copy link
Copy Markdown
Member Author

🤖 Solution Draft Log

This log file contains the complete execution trace of the AI solution draft process.

💰 Cost: $10.401908

📊 Context and tokens usage:

Claude Opus 4.8: (3 sub-sessions)

  1. 116.9K / 1M (12%) input tokens, 34.0K / 128K (27%) output tokens
  2. 116.7K / 1M (12%) input tokens, 39.0K / 128K (30%) output tokens
  3. 63.3K / 1M (6%) input tokens, 15.6K / 128K (12%) output tokens

Total: (29.6K new + 335.2K cache writes + 11.2M cache reads) input tokens, 103.0K output tokens, $10.401909 cost

🤖 Models used:

  • Tool: Anthropic Claude Code
  • Requested: opus
  • Model: Claude Opus 4.8 (claude-opus-4-8)

📎 Log file uploaded as Gist (4794KB)


Now working session is ended, feel free to review and add any feedback on the solution draft.

@konard

konard commented Jun 9, 2026

Copy link
Copy Markdown
Member Author

✅ Ready to merge

This pull request is now ready to be merged:

  • All CI checks have passed
  • No merge conflicts
  • No pending changes

Monitored by hive-mind with --auto-restart-until-mergeable flag

@konard konard merged commit a54f559 into main Jun 10, 2026
22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

stream() async iterator does not yield exit chunks and can hang if process stdout stays open

1 participant