Add output pipe support#13
Conversation
|
Warning Review limit reached
Your plan currently allows 1 review/hour. Refill in 42 minutes and 33 seconds. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more review capacity refills, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than trial, open-source, and free plans. In all cases, review capacity refills continuously over time. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (27)
📝 WalkthroughWalkthroughThis PR implements buffer pipe functionality—auxiliary subprocess management that ingests PTY output. Changes touch FlatBuffers schema and codec, server runtime/model and keeper, client event handling, CLI commands ( ChangesBuffer Pipe Feature Implementation
Sequence Diagram(s)sequenceDiagram
participant CLI
participant Server
participant Runtime
participant KeeperPipe
participant PTY
CLI->>Server: BufferRequest::StartPipe(command, env)
Server->>Runtime: start_pipe(command, cwd, env)
Runtime->>KeeperPipe: spawn subprocess (piped stdin)
KeeperPipe-->>Runtime: PipeStatus(Running, pid)
Runtime-->>Server: BufferRuntimePipeStatus
Server->>Server: update buffer.pipe
Server-->>CLI: BufferResponse::Success
PTY->>Runtime: read output bytes
Runtime->>KeeperPipe: write bytes -> pipe stdin (bounded queue)
KeeperPipe->>KeeperPipe: queue write (non-blocking)
PTY->>Runtime: PTY exits
Runtime->>KeeperPipe: stop_pipe_after_exit()
KeeperPipe-->>Runtime: PipeStatus(Stopped, reason=BufferExited)
Runtime-->>Server: BufferRuntimeUpdate.pipe
Server->>Server: update buffer.pipe
Server-->>CLI: ServerEvent::BufferPipeChanged
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
crates/embers-server/src/server.rs (1)
2734-2762:⚠️ Potential issue | 🟠 Major | ⚡ Quick winBuffer-exit path updates pipe state without stopping the actual pipe process.
On buffer exit, you only mutate in-memory pipe state (
stop_buffer_pipe) and emit events. The runtime pipe subprocess is not explicitly stopped here, so it can continue running after the parent buffer has exited.Suggested fix sketch
async fn record_buffer_exit(&self, buffer_id: BufferId, exit_code: Option<i32>) { let should_interrupt = self.take_buffer_shutdown_intent(buffer_id); + if !should_interrupt { + if let Some(runtime) = self.buffer_runtimes.lock().await.get(&buffer_id).cloned() { + let _ = runtime.stop_pipe().await; + } + } if should_interrupt { let runtime = self.buffer_runtimes.lock().await.remove(&buffer_id); drop(runtime); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/embers-server/src/server.rs` around lines 2734 - 2762, The code path that handles buffer exit (the branch where mark_buffer_exited(buffer_id, exit_code) returns Ok) only updates in-memory pipe state via stop_buffer_pipe and emits buffer_pipe_changed_event but does not terminate the actual runtime subprocess; update the Ok branch in server.rs (the block around mark_buffer_exited, stop_buffer_pipe and buffer_pipe_changed_event) so that when pipe_changed is true you also locate the runtime pipe process (e.g., using buffer_pid_hint(&buffer.state) or the buffer's runtime PID) and explicitly stop/kill it by calling the appropriate runtime/manager method (e.g., state/runtime_pipe_manager.stop/terminate the pid) and await/handle errors before emitting events; ensure this new stop call is only invoked for the BufferExited path (not the RuntimeInterrupted path) and that any errors are logged/handled.crates/embers-protocol/src/codec.rs (1)
221-241:⚠️ Potential issue | 🟠 Major | ⚡ Quick winReject empty
StartPipe.commandat protocol boundary.Line 237/Line 3480 currently allow
StartPipewith an empty command vector. This should be rejected during request validation to fail fast and keep request contracts consistent.Proposed validation patch
fn validate_buffer_request(req: &BufferRequest) -> Result<(), ProtocolError> { match req { BufferRequest::Create { .. } => Ok(()), + BufferRequest::StartPipe { + buffer_id, + command, + .. + } => { + validate_required_buffer_id(*buffer_id, "buffer_request.buffer_id")?; + if command.is_empty() { + return Err(ProtocolError::InvalidMessageOwned( + "buffer_request.command must not be empty".to_owned(), + )); + } + Ok(()) + } BufferRequest::List { session_id, .. } => { validate_optional_session_id(*session_id, "buffer_request.session_id") } BufferRequest::Get { buffer_id, .. } | BufferRequest::Inspect { buffer_id, .. } @@ - | BufferRequest::OpenHistory { buffer_id, .. } - | BufferRequest::StartPipe { buffer_id, .. } + | BufferRequest::OpenHistory { buffer_id, .. } | BufferRequest::StopPipe { buffer_id, .. } => { validate_required_buffer_id(*buffer_id, "buffer_request.buffer_id") } } }Also applies to: 3474-3486
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/embers-protocol/src/codec.rs` around lines 221 - 241, validate_buffer_request currently allows BufferRequest::StartPipe with an empty command vector; update validate_buffer_request to explicitly match BufferRequest::StartPipe { buffer_id, command, .. } and call a new or existing validation to reject an empty command (e.g., return Err(ProtocolError::InvalidField("buffer_request.command")) when command.is_empty()) in addition to validating buffer_id via validate_required_buffer_id(*buffer_id, "buffer_request.buffer_id"); make the same non-empty check for any other StartPipe occurrences so StartPipe.command is rejected at the protocol boundary.crates/embers-protocol/tests/family_round_trip.rs (1)
11-315: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick winAdd round-trip cases for the new pipe protocol surface.
The updated family tests don’t currently exercise
BufferRequest::StartPipe,BufferRequest::StopPipe,ServerEvent::BufferPipeChanged, or aBufferRecordwithpipe: Some(...). Add these to the family matrix so new wire mappings are protected by the same round-trip guarantees.Also applies to: 336-481, 650-668
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/embers-protocol/tests/family_round_trip.rs` around lines 11 - 315, The test's messages vector is missing round-trip cases for the new pipe protocol: add ClientMessage::Buffer(BufferRequest::StartPipe { request_id: RequestId(...), buffer_id: BufferId(...), pipe: PipeDescriptor{...} }) and ClientMessage::Buffer(BufferRequest::StopPipe { request_id: RequestId(...), buffer_id: BufferId(...), pipe_id: ... }) to the messages list, then add corresponding server-side cases (ServerEvent::BufferPipeChanged { ... }) and a BufferRecord instance with pipe: Some(...) wherever the family matrix defines server responses (the same sections that contain BufferRecord, ServerEvent, and BufferRequest test entries) so StartPipe/StopPipe and BufferPipeChanged are included in the round-trip assertions.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/embers-protocol/schema/embers.fbs`:
- Around line 594-603: The Envelope field IDs were renumbered starting at
buffer_detached_event (and subsequent fields) which breaks wire compatibility;
revert buffer_detached_event, node_changed_event, floating_changed_event,
focus_changed_event, render_invalidated_event, session_renamed_event,
client_changed_event, buffer_location_response, and
buffer_with_location_response to their original numeric IDs in the Envelope
definition and do not change existing IDs; any new fields must be added after
the current highest id (append-only) to preserve backward/forward compatibility.
In `@crates/embers-server/src/buffer_runtime.rs`:
- Around line 683-689: The synchronous pipe-forwarding in buffer_runtime.rs's
write method (the block that accesses self.child.stdin and calls
write_all/flush) can block the PTY read loop; instead create a bounded
async/worker queue (e.g., a bounded tokio mpsc channel or background thread with
a crossbeam bounded channel) and change write to enqueue the bytes (or a copy)
to that channel; run a dedicated writer task that performs the actual blocking
write_all/flush to self.child.stdin and handles slow consumers
(drop/metric/timeout/backpressure policy) so the read thread never performs
blocking I/O. Apply the same refactor to the other pipe-forwarding occurrence in
this module (the second inline stdin write path) so all pipe writes go through
the same bounded writer worker.
---
Outside diff comments:
In `@crates/embers-protocol/src/codec.rs`:
- Around line 221-241: validate_buffer_request currently allows
BufferRequest::StartPipe with an empty command vector; update
validate_buffer_request to explicitly match BufferRequest::StartPipe {
buffer_id, command, .. } and call a new or existing validation to reject an
empty command (e.g., return
Err(ProtocolError::InvalidField("buffer_request.command")) when
command.is_empty()) in addition to validating buffer_id via
validate_required_buffer_id(*buffer_id, "buffer_request.buffer_id"); make the
same non-empty check for any other StartPipe occurrences so StartPipe.command is
rejected at the protocol boundary.
In `@crates/embers-protocol/tests/family_round_trip.rs`:
- Around line 11-315: The test's messages vector is missing round-trip cases for
the new pipe protocol: add ClientMessage::Buffer(BufferRequest::StartPipe {
request_id: RequestId(...), buffer_id: BufferId(...), pipe: PipeDescriptor{...}
}) and ClientMessage::Buffer(BufferRequest::StopPipe { request_id:
RequestId(...), buffer_id: BufferId(...), pipe_id: ... }) to the messages list,
then add corresponding server-side cases (ServerEvent::BufferPipeChanged { ...
}) and a BufferRecord instance with pipe: Some(...) wherever the family matrix
defines server responses (the same sections that contain BufferRecord,
ServerEvent, and BufferRequest test entries) so StartPipe/StopPipe and
BufferPipeChanged are included in the round-trip assertions.
In `@crates/embers-server/src/server.rs`:
- Around line 2734-2762: The code path that handles buffer exit (the branch
where mark_buffer_exited(buffer_id, exit_code) returns Ok) only updates
in-memory pipe state via stop_buffer_pipe and emits buffer_pipe_changed_event
but does not terminate the actual runtime subprocess; update the Ok branch in
server.rs (the block around mark_buffer_exited, stop_buffer_pipe and
buffer_pipe_changed_event) so that when pipe_changed is true you also locate the
runtime pipe process (e.g., using buffer_pid_hint(&buffer.state) or the buffer's
runtime PID) and explicitly stop/kill it by calling the appropriate
runtime/manager method (e.g., state/runtime_pipe_manager.stop/terminate the pid)
and await/handle errors before emitting events; ensure this new stop call is
only invoked for the BufferExited path (not the RuntimeInterrupted path) and
that any errors are logged/handled.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: f2699be7-4a49-41a8-86f1-53739cec0cea
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (27)
Cargo.tomlcrates/embers-cli/Cargo.tomlcrates/embers-cli/src/automation.rscrates/embers-cli/src/lib.rscrates/embers-cli/tests/automation.rscrates/embers-cli/tests/integration.rscrates/embers-cli/tests/panes.rscrates/embers-client/src/client.rscrates/embers-client/src/configured_client.rscrates/embers-client/src/presentation.rscrates/embers-client/src/state.rscrates/embers-client/tests/configured_client.rscrates/embers-client/tests/presentation.rscrates/embers-client/tests/reducer.rscrates/embers-client/tests/support/mod.rscrates/embers-protocol/schema/embers.fbscrates/embers-protocol/src/codec.rscrates/embers-protocol/src/lib.rscrates/embers-protocol/src/types.rscrates/embers-protocol/tests/family_round_trip.rscrates/embers-server/src/buffer_runtime.rscrates/embers-server/src/lib.rscrates/embers-server/src/model.rscrates/embers-server/src/protocol.rscrates/embers-server/src/server.rscrates/embers-server/tests/client_sessions.rscrates/embers-server/tests/persistence.rs
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/embers-server/src/buffer_runtime.rs (1)
1223-1227:⚠️ Potential issue | 🟠 Major | ⚡ Quick winStop the owned pipe when the buffer process exits.
keeper_wait_looponly records the PTY exit code. If an output pipe is still running here, its stdin stays open until some external caller remembers to sendStopPipeAfterExit, so the keeper can leak the pipe subprocess and continue reporting it as running after the source buffer is gone. This cleanup should happen in the keeper itself.Suggested fix
fn keeper_wait_loop(runtime: Arc<KeeperRuntime>, mut child: Box<dyn Child + Send + Sync>) { let exit_code = child.wait().ok().and_then(exit_status_code); if let Ok(mut state) = runtime.exit_code.lock() { *state = Some(exit_code); } + if let Ok(mut pipe_slot) = runtime.pipe.lock() { + if let Some(pipe) = pipe_slot.as_mut() { + if pipe.status().running { + let _ = pipe.stop(BufferRuntimePipeStopReason::BufferExited); + } + } + } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/embers-server/src/buffer_runtime.rs` around lines 1223 - 1227, keeper_wait_loop currently only records the PTY exit code; after the child exits you must also trigger the keeper to stop any associated output pipe so its stdin is closed and the pipe subprocess is reaped. Update keeper_wait_loop (which has signature fn keeper_wait_loop(runtime: Arc<KeeperRuntime>, mut child: Box<dyn Child + Send + Sync>)) to, after setting *state = Some(exit_code), also notify the runtime to stop pipes for that keeper—e.g., send the StopPipeAfterExit command or call the runtime method that enqueues/handles StopPipeAfterExit (use the KeeperRuntime API responsible for pipe control) so any running output pipe is stopped immediately when the PTY process exits.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/embers-cli/src/lib.rs`:
- Around line 528-588: The response handlers for BufferCommand::Pipe (the
BufferPipeCommand::Start, Stop, and Show arms) must enforce that the server
returned the same buffer id that was requested; capture the requested BufferId
(e.g. let expected = BufferId(buffer_id)) before sending the request, and after
receiving the ServerResponse::Buffer verify response.buffer.id == expected,
returning Err(MuxError::protocol(...)) if they differ, and only call
format_buffer_pipe_details when the ids match; apply this check in the Start,
Stop, and Show match arms.
In `@crates/embers-cli/tests/automation.rs`:
- Around line 53-80: The for-loop with a fixed 0..4 cap can drop expected
records; replace it with a loop that continues reading via read_record(&mut
lines).await until both saw_response and saw_event are true (or fail with a
clear timeout), i.e. remove the fixed 4 iteration bound around the match on
record["kind"] and instead loop indefinitely (or loop with a deadline/attempt
counter) checking the saw_response and saw_event flags after each record; keep
the existing match branches (response/event) and break when both flags are true
to avoid flakiness.
In `@crates/embers-client/src/client.rs`:
- Around line 324-326: The branch handling ServerEvent::BufferPipeChanged is
redundantly triggering a remote fetch via refresh_buffer_record (which issues a
BufferRequest::Get) even though the new buffer state has already been applied by
ClientState::apply_event; remove the refresh_buffer_record call and instead rely
on the already-applied event buffer (or update any local indices/derivations
from event.buffer directly) so we avoid the extra network round-trip and
race-prone fetch.
In `@crates/embers-protocol/src/codec.rs`:
- Around line 1267-1288: validate_buffer_pipe_record currently only validates
the Running state; add a check for BufferPipeState::Stopped in
validate_buffer_pipe_record (function name) to reject records where both
record.stop_reason and record.exit_code are None, returning a
ProtocolError::InvalidMessageOwned with a clear message (e.g.,
"{field}.stop_reason or {field}.exit_code required for stopped pipe"). Keep
existing checks for command and Running; reference the BufferPipeRecord type and
BufferPipeState enum and their fields stop_reason and exit_code when adding this
validation.
---
Outside diff comments:
In `@crates/embers-server/src/buffer_runtime.rs`:
- Around line 1223-1227: keeper_wait_loop currently only records the PTY exit
code; after the child exits you must also trigger the keeper to stop any
associated output pipe so its stdin is closed and the pipe subprocess is reaped.
Update keeper_wait_loop (which has signature fn keeper_wait_loop(runtime:
Arc<KeeperRuntime>, mut child: Box<dyn Child + Send + Sync>)) to, after setting
*state = Some(exit_code), also notify the runtime to stop pipes for that
keeper—e.g., send the StopPipeAfterExit command or call the runtime method that
enqueues/handles StopPipeAfterExit (use the KeeperRuntime API responsible for
pipe control) so any running output pipe is stopped immediately when the PTY
process exits.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: e383b921-258f-49bc-a86e-a671112b895e
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (27)
Cargo.tomlcrates/embers-cli/Cargo.tomlcrates/embers-cli/src/automation.rscrates/embers-cli/src/lib.rscrates/embers-cli/tests/automation.rscrates/embers-cli/tests/integration.rscrates/embers-cli/tests/panes.rscrates/embers-client/src/client.rscrates/embers-client/src/configured_client.rscrates/embers-client/src/presentation.rscrates/embers-client/src/state.rscrates/embers-client/tests/configured_client.rscrates/embers-client/tests/presentation.rscrates/embers-client/tests/reducer.rscrates/embers-client/tests/support/mod.rscrates/embers-protocol/schema/embers.fbscrates/embers-protocol/src/codec.rscrates/embers-protocol/src/lib.rscrates/embers-protocol/src/types.rscrates/embers-protocol/tests/family_round_trip.rscrates/embers-server/src/buffer_runtime.rscrates/embers-server/src/lib.rscrates/embers-server/src/model.rscrates/embers-server/src/protocol.rscrates/embers-server/src/server.rscrates/embers-server/tests/client_sessions.rscrates/embers-server/tests/persistence.rs
99d273a to
8cbf65a
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/embers-server/src/buffer_runtime.rs (1)
1242-1256:⚠️ Potential issue | 🟠 Major | ⚡ Quick winEmit a final pipe update when status polling aborts.
If
connection.status()fails whilelast_pipeis set, this branch only callson_exit. Sinceon_exithas no pipe payload, consumers never learn that the pipe is gone/interrupted and can keep rendering the previous running state until a later resync.🧭 Minimal fix
Err(error) => { error!(%error, %inner.buffer_id, "status poll failed"); + if last_pipe.is_some() { + (callbacks.on_output)( + inner.buffer_id, + BufferRuntimeUpdate { + sequence: last_sequence, + activity: last_activity, + title: None, + pipe: Some(None), + }, + ); + } (callbacks.on_exit)(inner.buffer_id, None); break; }Also applies to: 1264-1280
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/embers-server/src/buffer_runtime.rs` around lines 1242 - 1256, When connection.status() fails in the status polling loop, emit a final pipe update before calling (callbacks.on_exit) so consumers know the pipe was lost; specifically, if last_pipe (from initial.pipe) is Some(_) or saw_exit is false, call the same pipe-update callback used elsewhere with inner.buffer_id and a None/empty pipe payload (or the appropriate "removed" pipe value) to signal removal, then set last_pipe to None and proceed to invoke (callbacks.on_exit)(inner.buffer_id, None). Apply the same change to the other status-failure branch handling (the second connection.status() error path) so both failure points update last_pipe/notify the pipe removal before calling on_exit.
♻️ Duplicate comments (1)
crates/embers-protocol/src/codec.rs (1)
1288-1293:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winRequire
stop_reasonon stopped pipe records.The current producer side always sets a terminal
stop_reasonfor stopped pipes, so acceptingstate=Stoppedwith onlyexit_codekeeps an ambiguous wire state valid that this PR never emits.🔒 Suggested hardening
- if matches!(record.state, BufferPipeState::Stopped) { - if record.stop_reason.is_none() && record.exit_code.is_none() { + if matches!(record.state, BufferPipeState::Stopped) { + if record.stop_reason.is_none() { return Err(ProtocolError::InvalidMessageOwned(format!( - "{field}.stop_reason or {field}.exit_code required for stopped pipe" + "{field}.stop_reason is required for a stopped pipe" ))); } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/embers-protocol/src/codec.rs` around lines 1288 - 1293, The decoder currently allows BufferPipeState::Stopped records that have only exit_code set, but the producer always sets stop_reason; update the validation in crates/embers-protocol/src/codec.rs (the branch checking matches!(record.state, BufferPipeState::Stopped)) to require record.stop_reason.is_some() (i.e. return ProtocolError::InvalidMessageOwned if stop_reason is None) instead of accepting an exit_code-only record, and adjust the error text to say "{field}.stop_reason required for stopped pipe" while keeping reference to ProtocolError::InvalidMessageOwned and the BufferPipeState::Stopped check.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/embers-cli/src/lib.rs`:
- Around line 382-387: The branches matching Command::Attach, Command::Serve,
Command::RuntimeKeeper and Command::Automation in execute_command() currently
return MuxError::internal(...) which incorrectly classifies caller mistakes as
internal bugs; replace that call with a user-facing error constructor (e.g.,
MuxError::user(...) or the crate's equivalent user-input error variant) so the
error becomes a stable user-input/invalid-request error with the same message
"interactive commands must be dispatched through run()"; update the return in
those arms to use the user-facing MuxError variant instead of
MuxError::internal.
- Around line 529-543: The BufferPipeCommand::Start branch currently forwards an
empty command array to the server; add a local validation right before
building/sending BufferRequest::StartPipe to reject command.is_empty() and
return a user-facing error (or bail/exit) like other validators in this file
instead of calling connection.request/new_request_id; place the check in the
Start arm (before constructing the request) and ensure it mirrors the CLI's
existing error handling semantics used elsewhere.
In `@crates/embers-protocol/src/types.rs`:
- Around line 208-215: Add explicit validation in BufferRecord::validate() for
BufferPipeRecord field combos: ensure for a pipe record with state ==
BufferPipeState::Running that stop_reason and exit_code are None; for state ==
BufferPipeState::Stopped that stop_reason is Some (and exit_code must be absent
unless your domain allows an exit code with Stopped); and for any
terminal/finished state (e.g., BufferPipeState::Finished or equivalent) require
exit_code.is_some() and stop_reason.is_none_or-consistent per your enum
semantics; on any contradiction return an Err describing the bad combination.
Reference BufferPipeRecord, BufferPipeState, BufferPipeStopReason in your checks
and mirror the same validations in the other mentioned validation site (lines
~647-672) so malformed pipe records are rejected at the protocol boundary.
In `@crates/embers-server/src/server.rs`:
- Around line 2674-2711: The pipe field is being updated regardless of update
freshness, allowing stale runtime callbacks to rewind buffer.pipe and emit a
bogus BufferPipeChanged; only apply update.pipe when the update is fresh. Change
the logic in the update handling (the block that reads and mutates buffer in
server.rs) so that buffer.pipe = update.pipe... is executed only when
sequence_advanced (i.e., update.sequence > buffer.last_snapshot_seq) is true, or
introduce a separate monotonic pipe version on the update and buffer and compare
that before assigning; ensure buffer_pipe_changed_event is only triggered when
the pipe assignment passed the freshness check (references: buffer.pipe,
update.pipe, update.sequence, buffer.last_snapshot_seq, sequence_advanced,
start_buffer_pipe/stop_buffer_pipe, buffer_pipe_changed_event).
---
Outside diff comments:
In `@crates/embers-server/src/buffer_runtime.rs`:
- Around line 1242-1256: When connection.status() fails in the status polling
loop, emit a final pipe update before calling (callbacks.on_exit) so consumers
know the pipe was lost; specifically, if last_pipe (from initial.pipe) is
Some(_) or saw_exit is false, call the same pipe-update callback used elsewhere
with inner.buffer_id and a None/empty pipe payload (or the appropriate "removed"
pipe value) to signal removal, then set last_pipe to None and proceed to invoke
(callbacks.on_exit)(inner.buffer_id, None). Apply the same change to the other
status-failure branch handling (the second connection.status() error path) so
both failure points update last_pipe/notify the pipe removal before calling
on_exit.
---
Duplicate comments:
In `@crates/embers-protocol/src/codec.rs`:
- Around line 1288-1293: The decoder currently allows BufferPipeState::Stopped
records that have only exit_code set, but the producer always sets stop_reason;
update the validation in crates/embers-protocol/src/codec.rs (the branch
checking matches!(record.state, BufferPipeState::Stopped)) to require
record.stop_reason.is_some() (i.e. return ProtocolError::InvalidMessageOwned if
stop_reason is None) instead of accepting an exit_code-only record, and adjust
the error text to say "{field}.stop_reason required for stopped pipe" while
keeping reference to ProtocolError::InvalidMessageOwned and the
BufferPipeState::Stopped check.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 422f6955-8645-4297-a0c3-45a42355b9ac
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (27)
Cargo.tomlcrates/embers-cli/Cargo.tomlcrates/embers-cli/src/automation.rscrates/embers-cli/src/lib.rscrates/embers-cli/tests/automation.rscrates/embers-cli/tests/integration.rscrates/embers-cli/tests/panes.rscrates/embers-client/src/client.rscrates/embers-client/src/configured_client.rscrates/embers-client/src/presentation.rscrates/embers-client/src/state.rscrates/embers-client/tests/configured_client.rscrates/embers-client/tests/presentation.rscrates/embers-client/tests/reducer.rscrates/embers-client/tests/support/mod.rscrates/embers-protocol/schema/embers.fbscrates/embers-protocol/src/codec.rscrates/embers-protocol/src/lib.rscrates/embers-protocol/src/types.rscrates/embers-protocol/tests/family_round_trip.rscrates/embers-server/src/buffer_runtime.rscrates/embers-server/src/lib.rscrates/embers-server/src/model.rscrates/embers-server/src/protocol.rscrates/embers-server/src/server.rscrates/embers-server/tests/client_sessions.rscrates/embers-server/tests/persistence.rs
a2d63a8 to
22144b5
Compare
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/embers-protocol/src/codec.rs`:
- Around line 197-207: The current validation only rejects an empty `command`
slice but allows a single empty string `[""]`, causing spawn failures later;
update `validate_non_empty_command` to also check that `command[0]` is not an
empty string and return Err(ProtocolError::InvalidMessageOwned(format!("{field}
first segment must not be empty"))) when it is, and likewise add the same
non-empty-first-segment check inside `BufferPipeRecord::validate()` so both
request-side and record-side validation reject `[""]` inputs.
In `@crates/embers-server/src/server.rs`:
- Around line 2686-2702: The code currently only applies update.pipe inside the
sequence_advanced branch, which drops pipe-only transitions; change the logic so
update.pipe is handled independently of sequence_advanced: always compare and
update buffer.pipe (using pipe.as_ref().map(model_pipe_from_runtime)), and if
the new pipe differs set render_invalidated = true and trigger the same
BufferPipeChanged behavior you use when sequence advances; apply the identical
fix to the earlier occurrence around the buffer handling at the 1296-1307 region
so pipe-only updates are not ignored.
- Around line 477-494: The restore failure path currently continues after
failing runtime.stop_pipe().await without ensuring the remote keeper process is
terminated, which can orphan a live runtime; update the error branch (around
runtime.stop_pipe().await and where status.pipe is handled) to explicitly
terminate the runtime/keeper before continuing — e.g., invoke a definitive
shutdown on the runtime handle (call runtime.terminate() or
runtime.force_terminate_keeper(), adding such a method if it doesn't exist),
ensure status.pipe is cleared (status.pipe = None) and persistently clear the
runtime socket via state.set_buffer_runtime_socket_path(buffer.id, None) and
mark the buffer interrupted as done today, then continue.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 16d6a7ae-da46-4a7d-8c63-dcec263deae1
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (27)
Cargo.tomlcrates/embers-cli/Cargo.tomlcrates/embers-cli/src/automation.rscrates/embers-cli/src/lib.rscrates/embers-cli/tests/automation.rscrates/embers-cli/tests/integration.rscrates/embers-cli/tests/panes.rscrates/embers-client/src/client.rscrates/embers-client/src/configured_client.rscrates/embers-client/src/presentation.rscrates/embers-client/src/state.rscrates/embers-client/tests/configured_client.rscrates/embers-client/tests/presentation.rscrates/embers-client/tests/reducer.rscrates/embers-client/tests/support/mod.rscrates/embers-protocol/schema/embers.fbscrates/embers-protocol/src/codec.rscrates/embers-protocol/src/lib.rscrates/embers-protocol/src/types.rscrates/embers-protocol/tests/family_round_trip.rscrates/embers-server/src/buffer_runtime.rscrates/embers-server/src/lib.rscrates/embers-server/src/model.rscrates/embers-server/src/protocol.rscrates/embers-server/src/server.rscrates/embers-server/tests/client_sessions.rscrates/embers-server/tests/persistence.rs
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/embers-protocol/src/codec.rs (1)
238-249:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winApply the new command validation to
BufferRequest::Createas well.
StartPipenow rejects[]/[""], butCreatestill returnsOk(()), so the primary buffer launch path continues to accept the same malformed command shape.Suggested fix
- BufferRequest::Create { .. } => Ok(()), + BufferRequest::Create { command, .. } => { + validate_non_empty_command(command, "buffer_request.command") + } BufferRequest::List { session_id, .. } => { validate_optional_session_id(*session_id, "buffer_request.session_id") }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/embers-protocol/src/codec.rs` around lines 238 - 249, In validate_buffer_request, the Create arm currently returns Ok(()) without validating the command shape, so add the same check used for StartPipe: call validate_non_empty_command on the Create variant's command field (using the same error path string, e.g., "buffer_request.command") and return its Result; ensure you still validate any required buffer_id/session_id fields in other arms as before so both BufferRequest::Create and BufferRequest::StartPipe reject []/[""] command arrays consistently.
♻️ Duplicate comments (1)
crates/embers-server/src/server.rs (1)
2688-2708:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftEqual-sequence callbacks can still rewind newer pipe state.
Runtime::start_buffer_pipe()andRuntime::stop_buffer_pipe()updatecrate::Buffer.pipeout of band, butRuntime::record_buffer_update()still acceptsupdate.pipewhenupdate.sequence == buffer.last_snapshot_seq. Because runtime callbacks are processed on spawned tasks, an older callback queued before the request can arrive later with the same sequence and overwrite the newer start/stop result. This still needs a pipe-specific freshness token or serialized pipe mutations.**🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/embers-server/src/server.rs` around lines 2688 - 2708, The bug is that Runtime::record_buffer_update() accepts update.pipe when update.sequence == buffer.last_snapshot_seq, allowing out-of-order runtime callbacks (from Runtime::start_buffer_pipe/stop_buffer_pipe) to rewind newer pipe state; change the logic so pipe updates are only applied when they are strictly fresher or use a pipe-specific freshness token. Concretely, in Runtime::record_buffer_update() (the block referencing buffer.last_snapshot_seq and update.pipe), either restrict the pipe-handling condition from "sequence_current" (>=) to the stricter "sequence_advanced" (>) or add a buffer.pipe_sequence (and corresponding update.pipe_sequence) and compare those before overwriting buffer.pipe to ensure only newer pipe mutations are applied; update Runtime::start_buffer_pipe()/stop_buffer_pipe() to set the pipe sequence token if you choose the token approach.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/embers-server/src/buffer_runtime.rs`:
- Around line 418-434: Add input validation to BufferRuntimeHandle::start_pipe
to reject not only an empty Vec but also a Vec whose first element is an empty
String (e.g., vec![String::new()]); before spawning the blocking task check if
command.is_empty() || command.first().map(|s| s.is_empty()).unwrap_or(true) and
return an appropriate Err(MuxError::...) consistent with StartPipe’s protocol
validation. Apply the same check to the other start_pipe occurrence noted in the
diff (the similar implementation at the later block) so callers that bypass the
protocol layer receive the same error as StartPipe.
In `@crates/embers-server/src/server.rs`:
- Around line 480-495: In the Err branch after failing runtime.stop_pipe(),
ensure we fully clear both in-memory and persisted pipe state for the buffer:
set status.pipe = None (clear the runtime pipe on the in-memory status) and call
the state mutation that clears the persisted socket (already present as
state.set_buffer_runtime_socket_path(buffer.id, None)); make sure these two
actions occur before marking the buffer interrupted (mark_buffer_interrupted)
and before continuing so buffer.pipe cannot later report stale metadata for
buffer.id (use buffer_pid_hint(&buffer.state) as before).
---
Outside diff comments:
In `@crates/embers-protocol/src/codec.rs`:
- Around line 238-249: In validate_buffer_request, the Create arm currently
returns Ok(()) without validating the command shape, so add the same check used
for StartPipe: call validate_non_empty_command on the Create variant's command
field (using the same error path string, e.g., "buffer_request.command") and
return its Result; ensure you still validate any required buffer_id/session_id
fields in other arms as before so both BufferRequest::Create and
BufferRequest::StartPipe reject []/[""] command arrays consistently.
---
Duplicate comments:
In `@crates/embers-server/src/server.rs`:
- Around line 2688-2708: The bug is that Runtime::record_buffer_update() accepts
update.pipe when update.sequence == buffer.last_snapshot_seq, allowing
out-of-order runtime callbacks (from
Runtime::start_buffer_pipe/stop_buffer_pipe) to rewind newer pipe state; change
the logic so pipe updates are only applied when they are strictly fresher or use
a pipe-specific freshness token. Concretely, in Runtime::record_buffer_update()
(the block referencing buffer.last_snapshot_seq and update.pipe), either
restrict the pipe-handling condition from "sequence_current" (>=) to the
stricter "sequence_advanced" (>) or add a buffer.pipe_sequence (and
corresponding update.pipe_sequence) and compare those before overwriting
buffer.pipe to ensure only newer pipe mutations are applied; update
Runtime::start_buffer_pipe()/stop_buffer_pipe() to set the pipe sequence token
if you choose the token approach.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 6e9af006-02d8-4937-8c79-b0d6436785a0
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (27)
Cargo.tomlcrates/embers-cli/Cargo.tomlcrates/embers-cli/src/automation.rscrates/embers-cli/src/lib.rscrates/embers-cli/tests/automation.rscrates/embers-cli/tests/integration.rscrates/embers-cli/tests/panes.rscrates/embers-client/src/client.rscrates/embers-client/src/configured_client.rscrates/embers-client/src/presentation.rscrates/embers-client/src/state.rscrates/embers-client/tests/configured_client.rscrates/embers-client/tests/presentation.rscrates/embers-client/tests/reducer.rscrates/embers-client/tests/support/mod.rscrates/embers-protocol/schema/embers.fbscrates/embers-protocol/src/codec.rscrates/embers-protocol/src/lib.rscrates/embers-protocol/src/types.rscrates/embers-protocol/tests/family_round_trip.rscrates/embers-server/src/buffer_runtime.rscrates/embers-server/src/lib.rscrates/embers-server/src/model.rscrates/embers-server/src/protocol.rscrates/embers-server/src/server.rscrates/embers-server/tests/client_sessions.rscrates/embers-server/tests/persistence.rs
- Added buffer-scoped, output-only pipe support through protocol, server, runtime, client, and CLI layers, including buffer pipe start|show|stop. - Added persistent embers automation mode with shell-style stdin parsing and NDJSON hello / response / event output. - Propagated BufferPipeChanged through client state/resync paths. - Closed the highest-risk coverage gaps with tests for CLI pipe flows, automation streaming, concurrent multi-client input, subscriber cleanup, and restart behavior
server, runtime, client, and CLI layers, including buffer pipe
start|show|stop.
and NDJSON hello / response / event output.
automation streaming, concurrent multi-client input, subscriber cleanup,
and restart behavior
Summary by CodeRabbit