From c63227c708ff6cad528ee3086df6f127c79ac063 Mon Sep 17 00:00:00 2001 From: Bruce Mitchener Date: Sun, 31 May 2026 15:45:54 +0700 Subject: [PATCH] execution_graph: validate graph API usage Return structured errors from graph construction and wiring APIs instead of silently ignoring bad nodes, inputs, or outputs. Preserve execution_tape TrapInfo in GraphError::Trap so callers can inspect the node, function, pc, span, and trap kind. Update examples, benchmark setup, docs, and regression tests for the Result-returning API. --- execution_graph/README.md | 10 +- execution_graph/src/dispatch.rs | 12 +- execution_graph/src/graph.rs | 450 +++++++++++++++---- execution_graph/src/pretty.rs | 35 +- execution_graph_examples/src/bin/tax.rs | 18 +- execution_graph_wind_tunnel/benches/graph.rs | 91 ++-- 6 files changed, 443 insertions(+), 173 deletions(-) diff --git a/execution_graph/README.md b/execution_graph/README.md index 8f86b2a..47bafa7 100644 --- a/execution_graph/README.md +++ b/execution_graph/README.md @@ -10,7 +10,7 @@ nodes and re-executes only the nodes that are affected by changes. - **Nodes** are `(VerifiedProgram, entry FuncId)` pairs. - **Edges** represent data dependencies; they are recorded dynamically from each node run: - reading an external input records `ResourceKey::Input(name)` - - reading another node's output records `ResourceKey::TapeOutput { node, output }` + - reading another node's output records `ResourceKey::NodeOutput { node, output }` - host ops can record additional dependencies via `execution_tape::host::AccessSink` - **Invalidation** is done by name: calling `invalidate_input("foo")` marks the input key `ResourceKey::Input("foo")` dirty, which may trigger re-execution of transitive dependents. @@ -24,6 +24,10 @@ Host state invalidation uses the same key space: if a host op records a via `ExecutionGraph::invalidate_tape_key(...)` (or by constructing the corresponding owned `execution_graph::ResourceKey` and calling `ExecutionGraph::invalidate(...)`). +Graph construction is checked at the public API boundary: `add_node`, `set_input_value`, and +`connect` return `GraphError` values for unknown entry functions, input arity mismatches, unknown +input names, and unknown output names. + ## Execution behavior `run_node` drains and executes only the dirty work within the dependency closure of the target @@ -46,5 +50,5 @@ cargo run -p execution_graph_examples --bin tax ## Current limitations -- Error reporting is intentionally minimal (`GraphError::Trap` is opaque); richer error surfaces - are expected to be layered on in follow-up PRs. +- `execution_graph` intentionally stays close to the VM: traps expose `execution_tape::vm::TrapInfo` + rather than source-language diagnostics. diff --git a/execution_graph/src/dispatch.rs b/execution_graph/src/dispatch.rs index f646e51..ac17d1b 100644 --- a/execution_graph/src/dispatch.rs +++ b/execution_graph/src/dispatch.rs @@ -177,8 +177,10 @@ mod tests { let (const_prog, const_entry) = make_const_program("value", 7); let mut g = ExecutionGraph::new(HostNoop, Limits::default()); - let n_err = g.add_node(needs_input_prog, needs_input_entry, vec!["in".into()]); - let n_ok = g.add_node(const_prog, const_entry, vec![]); + let n_err = g + .add_node(needs_input_prog, needs_input_entry, vec!["in".into()]) + .unwrap(); + let n_ok = g.add_node(const_prog, const_entry, vec![]).unwrap(); let plan = RunPlan::all(vec![n_err, n_ok]); let mut dispatcher = InlineDispatcher; @@ -199,8 +201,8 @@ mod tests { let (prog, entry) = make_const_program("value", 11); let mut g = ExecutionGraph::new(HostNoop, Limits::default()); - let n0 = g.add_node(prog.clone(), entry, vec![]); - let n1 = g.add_node(prog, entry, vec![]); + let n0 = g.add_node(prog.clone(), entry, vec![]).unwrap(); + let n1 = g.add_node(prog, entry, vec![]).unwrap(); let r0 = NodeRunDetail { node: n0, @@ -233,7 +235,7 @@ mod tests { fn inline_dispatcher_with_report_handles_short_trace_vectors() { let (prog, entry) = make_const_program("value", 5); let mut g = ExecutionGraph::new(HostNoop, Limits::default()); - let node = g.add_node(prog, entry, vec![]); + let node = g.add_node(prog, entry, vec![]).unwrap(); // Empty trace payload: execution should still succeed and simply produce no traced rows. let trace = RunPlanTrace::from_node_reports(vec![]); diff --git a/execution_graph/src/graph.rs b/execution_graph/src/graph.rs index e963deb..e423cf0 100644 --- a/execution_graph/src/graph.rs +++ b/execution_graph/src/graph.rs @@ -19,7 +19,7 @@ use execution_tape::host::SigHash; use execution_tape::trace::{TraceMask, TraceSink}; use execution_tape::value::{FuncId, Value}; use execution_tape::verifier::VerifiedProgram; -use execution_tape::vm::{ExecutionContext, Limits, Vm}; +use execution_tape::vm::{ExecutionContext, Limits, TrapInfo, Vm}; use hashbrown::HashMap; use crate::access::{Access, AccessLog, HostOpId, NodeId, ResourceKey}; @@ -40,6 +40,34 @@ use invalidation::trace::OneParentRecorder; pub enum GraphError { /// A node id was invalid. BadNodeId, + /// A function id was not present in the verified program supplied for a node. + BadEntryFunc { + /// Invalid entry function id. + func: FuncId, + }, + /// A node's declared graph inputs did not match its tape function arity. + BadInputArity { + /// Entry function id for the node being added. + func: FuncId, + /// Expected graph input count from the tape function signature. + expected: usize, + /// Actual input count supplied by the caller. + actual: usize, + }, + /// A named node input does not exist. + UnknownInput { + /// Node whose input was requested. + node: NodeId, + /// Input name. + name: Box, + }, + /// A named node output does not exist. + UnknownOutput { + /// Node whose output was requested. + node: NodeId, + /// Output name. + name: Box, + }, /// A required input binding was missing. MissingInput { /// Node that is missing the binding. @@ -69,13 +97,44 @@ pub enum GraphError { sig_hash: SigHash, }, /// VM execution trapped. - Trap, + Trap { + /// Node being executed when the VM trapped. + node: NodeId, + /// Underlying VM trap information. + trap: TrapInfo, + }, } impl fmt::Display for GraphError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::BadNodeId => write!(f, "bad node id"), + Self::BadEntryFunc { func } => { + write!( + f, + "bad entry function: f{} is not in the node program", + func.0 + ) + } + Self::BadInputArity { + func, + expected, + actual, + } => write!( + f, + "bad node input arity: entry=f{} expected {expected} inputs, got {actual}", + func.0 + ), + Self::UnknownInput { node, name } => write!( + f, + "unknown node input: node={} input={name}; check the input_names passed to add_node(...)", + node.as_u64() + ), + Self::UnknownOutput { node, name } => write!( + f, + "unknown node output: node={} output={name}; check the producer's function output names", + node.as_u64() + ), Self::MissingInput { node, name } => { write!( f, @@ -107,7 +166,13 @@ impl fmt::Display for GraphError { node.as_u64(), sig_hash.0 ), - Self::Trap => write!(f, "vm trapped during graph node execution"), + Self::Trap { node, trap } => { + write!( + f, + "vm trapped during graph node execution: node={} {trap}", + node.as_u64() + ) + } } } } @@ -300,20 +365,33 @@ impl ExecutionGraph { /// Adds a node and returns its [`NodeId`]. /// /// `input_names` defines the mapping from per-node binding names to positional function args. + /// + /// Returns [`GraphError::BadEntryFunc`] if `entry` is not present in `program`, or + /// [`GraphError::BadInputArity`] if `input_names` does not match the entry function's + /// argument count. pub fn add_node( &mut self, program: Arc, entry: FuncId, input_names: Vec>, - ) -> NodeId { + ) -> Result { let node = NodeId::new(u64::try_from(self.nodes.len()).unwrap_or(u64::MAX)); let program_ref = program.program(); - let ret_count = program_ref + let func = program_ref .functions .get(entry.0 as usize) - .map(|f| f.ret_count as usize) - .unwrap_or(0); + .ok_or(GraphError::BadEntryFunc { func: entry })?; + let expected_inputs = func.arg_count as usize; + let actual_inputs = input_names.len(); + if actual_inputs != expected_inputs { + return Err(GraphError::BadInputArity { + func: entry, + expected: expected_inputs, + actual: actual_inputs, + }); + } + let ret_count = func.ret_count as usize; let mut output_names: Vec> = Vec::with_capacity(ret_count); for i in 0..ret_count { @@ -360,7 +438,7 @@ impl ExecutionGraph { }; self.nodes.push(n); - node + Ok(node) } /// Binds a named input to a concrete value. @@ -371,23 +449,30 @@ impl ExecutionGraph { /// /// If a node declares duplicate input names (for example `["x", "x"]`), those slots are /// treated as aliases: setting `"x"` binds all matching slots. - pub fn set_input_value(&mut self, node: NodeId, name: impl Into>, value: Value) { - let Ok(index) = usize::try_from(node.as_u64()) else { - return; - }; + /// + /// Returns [`GraphError::BadNodeId`] for an unknown node or [`GraphError::UnknownInput`] for + /// an input name that was not declared when the node was added. + pub fn set_input_value( + &mut self, + node: NodeId, + name: impl Into>, + value: Value, + ) -> Result<(), GraphError> { + let index = usize::try_from(node.as_u64()).map_err(|_| GraphError::BadNodeId)?; let name: Box = name.into(); // Validate node and slot exist before interning to avoid memory churn on bad inputs. - if self + let Some(slots) = self .nodes .get(index) .and_then(|n| n.input_slots.get(name.as_ref())) - .is_none() - { - return; - } + .cloned() + else { + let _ = self.nodes.get(index).ok_or(GraphError::BadNodeId)?; + return Err(GraphError::UnknownInput { node, name }); + }; let read_id = self.intern_input_id(name.as_ref()); let n = &mut self.nodes[index]; - for &slot in n.input_slots.get(name.as_ref()).unwrap() { + for slot in slots { if let Some(binding) = n.inputs.get_mut(slot) { *binding = Some(Binding::External { value: value.clone(), @@ -395,34 +480,54 @@ impl ExecutionGraph { }); } } + Ok(()) } /// Connects `from.output` into `to.input`. /// /// If `to` declares duplicate input names, all slots matching `to.input` are connected. + /// + /// Returns [`GraphError::BadNodeId`] for an unknown source or target node, + /// [`GraphError::UnknownOutput`] for an output name not produced by the source node, or + /// [`GraphError::UnknownInput`] for an input name not declared by the target node. pub fn connect( &mut self, from: NodeId, output: impl Into>, to: NodeId, input: impl Into>, - ) { + ) -> Result<(), GraphError> { let output: Box = output.into(); let input: Box = input.into(); - let Ok(index) = usize::try_from(to.as_u64()) else { - return; - }; - // Validate target node exists before interning to avoid memory churn on bad inputs. - if self.nodes.get(index).is_none() { - return; + let from_index = usize::try_from(from.as_u64()).map_err(|_| GraphError::BadNodeId)?; + let from_node = self.nodes.get(from_index).ok_or(GraphError::BadNodeId)?; + if !from_node + .output_names + .iter() + .any(|candidate| candidate.as_ref() == output.as_ref()) + { + return Err(GraphError::UnknownOutput { + node: from, + name: output, + }); } + let to_index = usize::try_from(to.as_u64()).map_err(|_| GraphError::BadNodeId)?; + let slots = self + .nodes + .get(to_index) + .ok_or(GraphError::BadNodeId)? + .input_slots + .get(input.as_ref()) + .cloned() + .ok_or(GraphError::UnknownInput { + node: to, + name: input, + })?; let read_id = self .dirty .intern(ResourceKey::node_output(from, output.clone())); - if let Some(n) = self.nodes.get_mut(index) - && let Some(slots) = n.input_slots.get(input.as_ref()) - { - for &slot in slots { + if let Some(n) = self.nodes.get_mut(to_index) { + for slot in slots { if let Some(binding) = n.inputs.get_mut(slot) { *binding = Some(Binding::FromNode { node: from, @@ -438,11 +543,8 @@ impl ExecutionGraph { // // This ensures initial runs are topologically ordered even before dependencies have been // observed dynamically. - let Ok(to_index) = usize::try_from(to.as_u64()) else { - return; - }; let Some(to_node) = self.nodes.get(to_index) else { - return; + return Err(GraphError::BadNodeId); }; let output_count = to_node.output_ids.len(); let src = read_id; @@ -451,6 +553,7 @@ impl ExecutionGraph { self.dirty.add_dependency(dst, src); self.dirty.mark_dirty(dst); } + Ok(()) } /// Marks an input key dirty (propagating to dependents after dependencies are established). @@ -841,6 +944,7 @@ impl ExecutionGraph { } fn execute_kind( + node: NodeId, kind: &mut NodeKind, vm: &mut Vm, ctx: &mut ExecutionContext, @@ -860,7 +964,7 @@ impl ExecutionGraph { trace, Some(tape_access), ) - .map_err(|_| GraphError::Trap), + .map_err(|trap| GraphError::Trap { node, trap }), } } @@ -952,6 +1056,7 @@ impl ExecutionGraph { )) }; Self::execute_kind( + node, &mut self.nodes[node_index].kind, &mut self.vm, &mut self.ctx, @@ -1047,6 +1152,7 @@ mod tests { use execution_tape::host::{HostContext, HostError, SigHash, ValueRef}; use execution_tape::host::{HostSig, ResourceKeyRef, sig_hash}; use execution_tape::program::ValueType; + use execution_tape::vm::Trap; use std::cell::RefCell; use std::collections::BTreeMap; use std::rc::Rc; @@ -1069,6 +1175,38 @@ mod tests { #[test] fn graph_error_display_includes_actionable_context() { + let bad_entry = GraphError::BadEntryFunc { func: FuncId(99) }.to_string(); + assert!(bad_entry.contains("f99")); + assert!(bad_entry.contains("not in the node program")); + + let bad_arity = GraphError::BadInputArity { + func: FuncId(1), + expected: 2, + actual: 1, + } + .to_string(); + assert!(bad_arity.contains("entry=f1")); + assert!(bad_arity.contains("expected 2 inputs")); + assert!(bad_arity.contains("got 1")); + + let unknown_input = GraphError::UnknownInput { + node: NodeId::new(5), + name: "qty".into(), + } + .to_string(); + assert!(unknown_input.contains("node=5")); + assert!(unknown_input.contains("input=qty")); + assert!(unknown_input.contains("add_node")); + + let unknown_output = GraphError::UnknownOutput { + node: NodeId::new(6), + name: "subtotal".into(), + } + .to_string(); + assert!(unknown_output.contains("node=6")); + assert!(unknown_output.contains("output=subtotal")); + assert!(unknown_output.contains("function output names")); + let missing_input = GraphError::MissingInput { node: NodeId::new(7), name: "subtotal".into(), @@ -1121,7 +1259,7 @@ mod tests { let a_prog = Arc::new(pb.build_verified().unwrap()); let mut g = ExecutionGraph::new(HostNoop, Limits::default()); - let na = g.add_node(a_prog, a_node, vec![]); + let na = g.add_node(a_prog, a_node, vec![]).unwrap(); g.run_all().unwrap(); let first = g.node_run_count(na).unwrap(); g.run_all().unwrap(); @@ -1154,24 +1292,25 @@ mod tests { let (b_prog, b_entry) = make_identity_program("value"); // Target chain: A -> B - let na = g.add_node(a_prog, a_entry, vec!["a".into()]); - let nb = g.add_node(b_prog, b_entry, vec!["b".into()]); - g.set_input_value(na, "a", Value::I64(1)); - g.connect(na, "value", nb, "b"); + let na = g.add_node(a_prog, a_entry, vec!["a".into()]).unwrap(); + let nb = g.add_node(b_prog, b_entry, vec!["b".into()]).unwrap(); + g.set_input_value(na, "a", Value::I64(1)).unwrap(); + g.connect(na, "value", nb, "b").unwrap(); // Many unrelated chains: X_i -> Y_i let mut unrelated_leaves: Vec = Vec::new(); for i in 0..32_u64 { let (x_prog, x_entry) = make_identity_program("value"); let (y_prog, y_entry) = make_identity_program("value"); - let nx = g.add_node(x_prog, x_entry, vec!["x".into()]); - let ny = g.add_node(y_prog, y_entry, vec!["y".into()]); + let nx = g.add_node(x_prog, x_entry, vec!["x".into()]).unwrap(); + let ny = g.add_node(y_prog, y_entry, vec!["y".into()]).unwrap(); g.set_input_value( nx, "x", Value::I64(10 + i64::try_from(i).unwrap_or(i64::MAX)), - ); - g.connect(nx, "value", ny, "y"); + ) + .unwrap(); + g.connect(nx, "value", ny, "y").unwrap(); unrelated_leaves.push(ny); } @@ -1182,7 +1321,7 @@ mod tests { } // Dirty target chain and all unrelated chains. - g.set_input_value(na, "a", Value::I64(2)); + g.set_input_value(na, "a", Value::I64(2)).unwrap(); g.invalidate_input("a"); // This invalidates the shared input key for all unrelated chains. The key property we @@ -1230,14 +1369,14 @@ mod tests { let (a_prog, a_entry) = make_identity_program("value"); let (b_prog, b_entry) = make_identity_program("value"); - let na = g.add_node(a_prog, a_entry, vec!["a".into()]); - let nb = g.add_node(b_prog, b_entry, vec!["b".into()]); - g.set_input_value(na, "a", Value::I64(1)); - g.connect(na, "value", nb, "b"); + let na = g.add_node(a_prog, a_entry, vec!["a".into()]).unwrap(); + let nb = g.add_node(b_prog, b_entry, vec!["b".into()]).unwrap(); + g.set_input_value(na, "a", Value::I64(1)).unwrap(); + g.connect(na, "value", nb, "b").unwrap(); g.run_all().unwrap(); - g.set_input_value(na, "a", Value::I64(2)); + g.set_input_value(na, "a", Value::I64(2)).unwrap(); g.invalidate_input("a"); let r = g.run_node_with_report(nb, ReportDetailMask::FULL).unwrap(); @@ -1303,10 +1442,10 @@ mod tests { let (a_prog, a_entry) = make_identity_program("value"); let (b_prog, b_entry) = make_identity_program("value"); - let na = g.add_node(a_prog, a_entry, vec!["a".into()]); - let nb = g.add_node(b_prog, b_entry, vec!["b".into()]); - g.set_input_value(na, "a", Value::I64(1)); - g.connect(na, "value", nb, "b"); + let na = g.add_node(a_prog, a_entry, vec!["a".into()]).unwrap(); + let nb = g.add_node(b_prog, b_entry, vec!["b".into()]).unwrap(); + g.set_input_value(na, "a", Value::I64(1)).unwrap(); + g.connect(na, "value", nb, "b").unwrap(); let first = g.run_all().unwrap(); assert_eq!(first.executed_nodes, 2); @@ -1338,13 +1477,13 @@ mod tests { let (a_prog, a_entry) = make_identity_program("value"); let (b_prog, b_entry) = make_identity_program("value"); - let na = g.add_node(a_prog, a_entry, vec!["a".into()]); - let nb = g.add_node(b_prog, b_entry, vec!["b".into()]); - g.set_input_value(na, "a", Value::I64(1)); - g.connect(na, "value", nb, "b"); + let na = g.add_node(a_prog, a_entry, vec!["a".into()]).unwrap(); + let nb = g.add_node(b_prog, b_entry, vec!["b".into()]).unwrap(); + g.set_input_value(na, "a", Value::I64(1)).unwrap(); + g.connect(na, "value", nb, "b").unwrap(); g.run_all().unwrap(); - g.set_input_value(na, "a", Value::I64(2)); + g.set_input_value(na, "a", Value::I64(2)).unwrap(); g.invalidate_input("a"); let minimal = g.run_node_with_report(nb, ReportDetailMask::NONE).unwrap(); @@ -1354,7 +1493,7 @@ mod tests { assert!(e.why_path.is_none()); } - g.set_input_value(na, "a", Value::I64(3)); + g.set_input_value(na, "a", Value::I64(3)).unwrap(); g.invalidate_input("a"); let because_only = g @@ -1417,7 +1556,7 @@ mod tests { let prog = Arc::new(pb.build_verified().unwrap()); let mut g = ExecutionGraph::new(HostNoAccess, Limits::default()); - let n = g.add_node(prog, f, vec![]); + let n = g.add_node(prog, f, vec![]).unwrap(); g.set_strict_deps(true); assert_eq!( @@ -1451,7 +1590,7 @@ mod tests { let prog = Arc::new(pb.build_verified().unwrap()); let mut g = ExecutionGraph::new(HostNoop, Limits::default()); - let n = g.add_node(prog, f, vec!["in".into()]); + let n = g.add_node(prog, f, vec!["in".into()]).unwrap(); assert_eq!( g.run_all(), @@ -1463,7 +1602,115 @@ mod tests { } #[test] - fn run_all_errors_on_missing_upstream_output() { + fn run_all_preserves_vm_trap_info() { + let mut pb = ProgramBuilder::new(); + let mut a = Asm::new(); + a.const_i64(1, 1); + a.const_i64(2, 0); + a.i64_div(3, 1, 2); + a.ret(0, &[3]); + let f = pb + .push_function_checked( + a, + FunctionSig { + arg_types: vec![], + ret_types: vec![ValueType::I64], + }, + ) + .unwrap(); + pb.set_function_output_name(f, 0, "value").unwrap(); + let prog = Arc::new(pb.build_verified().unwrap()); + + let mut g = ExecutionGraph::new(HostNoop, Limits::default()); + let n = g.add_node(prog, f, vec![]).unwrap(); + + let Err(GraphError::Trap { node, trap }) = g.run_all() else { + panic!("divide-by-zero should surface as a graph trap"); + }; + assert_eq!(node, n); + assert_eq!(trap.func, f); + assert_eq!(trap.trap, Trap::DivByZero); + } + + #[test] + fn graph_builder_errors_on_bad_entry_func() { + let mut pb = ProgramBuilder::new(); + let mut a = Asm::new(); + a.ret(0, &[]); + pb.push_function_checked( + a, + FunctionSig { + arg_types: vec![], + ret_types: vec![], + }, + ) + .unwrap(); + let prog = Arc::new(pb.build_verified().unwrap()); + + let mut g = ExecutionGraph::new(HostNoop, Limits::default()); + assert_eq!( + g.add_node(prog, FuncId(99), vec![]), + Err(GraphError::BadEntryFunc { func: FuncId(99) }) + ); + } + + #[test] + fn graph_builder_errors_on_input_arity_mismatch() { + let mut pb = ProgramBuilder::new(); + let mut a = Asm::new(); + a.ret(0, &[1]); + let f = pb + .push_function_checked( + a, + FunctionSig { + arg_types: vec![ValueType::I64], + ret_types: vec![ValueType::I64], + }, + ) + .unwrap(); + let prog = Arc::new(pb.build_verified().unwrap()); + + let mut g = ExecutionGraph::new(HostNoop, Limits::default()); + assert_eq!( + g.add_node(prog, f, vec![]), + Err(GraphError::BadInputArity { + func: f, + expected: 1, + actual: 0, + }) + ); + } + + #[test] + fn set_input_value_errors_on_unknown_input() { + let mut pb = ProgramBuilder::new(); + let mut a = Asm::new(); + a.ret(0, &[1]); + let f = pb + .push_function_checked( + a, + FunctionSig { + arg_types: vec![ValueType::I64], + ret_types: vec![ValueType::I64], + }, + ) + .unwrap(); + let prog = Arc::new(pb.build_verified().unwrap()); + + let mut g = ExecutionGraph::new(HostNoop, Limits::default()); + let n = g.add_node(prog, f, vec!["qty".into()]).unwrap(); + + assert_eq!( + g.set_input_value(n, "unit_price", Value::I64(10)), + Err(GraphError::UnknownInput { + node: n, + name: "unit_price".into(), + }) + ); + } + + #[test] + fn connect_errors_on_unknown_names() { fn make_const_program(output_name: &str, v: i64) -> (Arc, FuncId) { let mut pb = ProgramBuilder::new(); let mut a = Asm::new(); @@ -1503,16 +1750,21 @@ mod tests { let (b_prog, b_entry) = make_identity_program("value"); let mut g = ExecutionGraph::new(HostNoop, Limits::default()); - let na = g.add_node(a_prog, a_entry, vec![]); - let nb = g.add_node(b_prog, b_entry, vec!["x".into()]); - - // Wire a non-existent output name. - g.connect(na, "does_not_exist", nb, "x"); + let na = g.add_node(a_prog, a_entry, vec![]).unwrap(); + let nb = g.add_node(b_prog, b_entry, vec!["x".into()]).unwrap(); assert_eq!( - g.run_all(), - Err(GraphError::MissingUpstreamOutput { + g.connect(na, "does_not_exist", nb, "x"), + Err(GraphError::UnknownOutput { node: na, + name: "does_not_exist".into(), + }) + ); + + assert_eq!( + g.connect(na, "value", nb, "does_not_exist"), + Err(GraphError::UnknownInput { + node: nb, name: "does_not_exist".into() }) ); @@ -1589,7 +1841,7 @@ mod tests { }; let mut g = ExecutionGraph::new(host, Limits::default()); - let n = g.add_node(prog, f, vec![]); + let n = g.add_node(prog, f, vec![]).unwrap(); g.run_all().unwrap(); assert_eq!( @@ -1689,7 +1941,7 @@ mod tests { }, Limits::default(), ); - let n = g.add_node(prog, f, vec![]); + let n = g.add_node(prog, f, vec![]).unwrap(); g.run_all().unwrap(); let first_ids = g.nodes[usize::try_from(n.as_u64()).unwrap()] @@ -1773,7 +2025,7 @@ mod tests { }; let mut g = ExecutionGraph::new(host, Limits::default()); - let n = g.add_node(prog, f, vec![]); + let n = g.add_node(prog, f, vec![]).unwrap(); g.run_all().unwrap(); assert_eq!( @@ -1818,13 +2070,13 @@ mod tests { let (c_prog, c_entry) = make_identity_program("value"); let mut g = ExecutionGraph::new(HostNoop, Limits::default()); - let na = g.add_node(a_prog, a_entry, vec!["in".into()]); - let nb = g.add_node(b_prog, b_entry, vec!["x".into()]); - let nc = g.add_node(c_prog, c_entry, vec!["y".into()]); + let na = g.add_node(a_prog, a_entry, vec!["in".into()]).unwrap(); + let nb = g.add_node(b_prog, b_entry, vec!["x".into()]).unwrap(); + let nc = g.add_node(c_prog, c_entry, vec!["y".into()]).unwrap(); - g.set_input_value(na, "in", Value::I64(7)); - g.connect(na, "value", nb, "x"); - g.connect(nb, "value", nc, "y"); + g.set_input_value(na, "in", Value::I64(7)).unwrap(); + g.connect(na, "value", nb, "x").unwrap(); + g.connect(nb, "value", nc, "y").unwrap(); g.run_all().unwrap(); assert_eq!( @@ -1842,7 +2094,7 @@ mod tests { assert_eq!(g.node_run_count(nc), Some(1)); // Change the external input and invalidate its key. - g.set_input_value(na, "in", Value::I64(8)); + g.set_input_value(na, "in", Value::I64(8)).unwrap(); g.invalidate_input("in"); g.run_all().unwrap(); @@ -1896,19 +2148,25 @@ mod tests { let (b_prog, b_entry) = make_const_program("value", 9); let mut g = ExecutionGraph::new(HostNoop, Limits::default()); - let na = g.add_node(a_prog, a_entry, vec!["in".into()]); - let nb = g.add_node(b_prog, b_entry, vec![]); - - // This creates conservative dirty edges from A -> B, but B has zero observed reads. - g.connect(na, "value", nb, "not_an_input"); - g.set_input_value(na, "in", Value::I64(1)); + let na = g.add_node(a_prog, a_entry, vec!["in".into()]).unwrap(); + let nb = g.add_node(b_prog, b_entry, vec![]).unwrap(); + + // Seed the same conservative dirty edge that `connect` creates before dynamic access + // refinement, but keep B input-free so its first run observes zero reads. + let na_index = usize::try_from(na.as_u64()).unwrap(); + let nb_index = usize::try_from(nb.as_u64()).unwrap(); + let src = g.nodes[na_index].output_ids[0]; + let dst = g.nodes[nb_index].output_ids[0]; + g.dirty.add_dependency(dst, src); + g.dirty.mark_dirty(dst); + g.set_input_value(na, "in", Value::I64(1)).unwrap(); g.run_all().unwrap(); assert_eq!(g.node_run_count(na), Some(1)); assert_eq!(g.node_run_count(nb), Some(1)); // If conservative deps were not replaced on first run, this would spuriously rerun B. - g.set_input_value(na, "in", Value::I64(2)); + g.set_input_value(na, "in", Value::I64(2)).unwrap(); g.invalidate_input("in"); g.run_all().unwrap(); @@ -1942,8 +2200,8 @@ mod tests { let prog = Arc::new(pb.build_verified().unwrap()); let mut g = ExecutionGraph::new(HostNoop, Limits::default()); - let n = g.add_node(prog, f, vec!["x".into(), "x".into()]); - g.set_input_value(n, "x", Value::I64(7)); + let n = g.add_node(prog, f, vec!["x".into(), "x".into()]).unwrap(); + g.set_input_value(n, "x", Value::I64(7)).unwrap(); g.run_all().unwrap(); assert_eq!( @@ -1975,7 +2233,7 @@ mod tests { let mut g = ExecutionGraph::new(HostNoop, Limits::default()); g.set_collect_access_log(true); - let n = g.add_node(prog, f, vec![]); + let n = g.add_node(prog, f, vec![]).unwrap(); g.run_all().unwrap(); let log = g.node_last_access(n); @@ -2006,8 +2264,8 @@ mod tests { let (prog, entry) = make_identity_program("value"); let mut g = ExecutionGraph::new(HostNoop, Limits::default()); - let n = g.add_node(prog, entry, vec!["in".into()]); - g.set_input_value(n, "in", Value::I64(1)); + let n = g.add_node(prog, entry, vec!["in".into()]).unwrap(); + g.set_input_value(n, "in", Value::I64(1)).unwrap(); // Run with collection enabled — should produce a log. g.set_collect_access_log(true); @@ -2016,7 +2274,7 @@ mod tests { // Disable collection, rerun — stale log must be cleared. g.set_collect_access_log(false); - g.set_input_value(n, "in", Value::I64(2)); + g.set_input_value(n, "in", Value::I64(2)).unwrap(); g.invalidate_input("in"); g.run_all().unwrap(); assert!( @@ -2046,10 +2304,10 @@ mod tests { let (prog, entry) = make_identity_program("value"); let mut g = ExecutionGraph::new(HostNoop, Limits::default()); - let n = g.add_node(prog, entry, vec!["in".into()]); + let n = g.add_node(prog, entry, vec!["in".into()]).unwrap(); // First run populates the output map. - g.set_input_value(n, "in", Value::I64(10)); + g.set_input_value(n, "in", Value::I64(10)).unwrap(); g.run_all().unwrap(); assert_eq!( g.node_outputs(n).unwrap().get("value"), @@ -2057,7 +2315,7 @@ mod tests { ); // Second run uses in-place update path. - g.set_input_value(n, "in", Value::I64(20)); + g.set_input_value(n, "in", Value::I64(20)).unwrap(); g.invalidate_input("in"); g.run_all().unwrap(); assert_eq!( @@ -2066,7 +2324,7 @@ mod tests { ); // Third run confirms stability. - g.set_input_value(n, "in", Value::I64(30)); + g.set_input_value(n, "in", Value::I64(30)).unwrap(); g.invalidate_input("in"); g.run_all().unwrap(); assert_eq!( diff --git a/execution_graph/src/pretty.rs b/execution_graph/src/pretty.rs index 5852f8a..ef72295 100644 --- a/execution_graph/src/pretty.rs +++ b/execution_graph/src/pretty.rs @@ -73,8 +73,7 @@ impl ExecutionGraph { /// Renders the graph as Graphviz DOT. /// /// Nodes are rendered as record-shaped boxes with one input and output port per declared - /// argument/return. If a connection references a non-existent upstream output name, the edge - /// is still emitted as a dashed fallback edge so broken wiring remains visible. + /// argument/return. #[must_use] pub fn to_dot(&self) -> String { let mut dot = String::from( @@ -206,10 +205,12 @@ mod tests { let (a_prog, a_entry) = make_identity_program("subtotal"); let (b_prog, b_entry) = make_identity_program("total"); - let na = g.add_node(a_prog, a_entry, vec!["qty".into()]); - let nb = g.add_node(b_prog, b_entry, vec!["subtotal".into()]); - g.set_input_value(na, "qty", Value::I64(2)); - g.connect(na, "subtotal", nb, "subtotal"); + let na = g.add_node(a_prog, a_entry, vec!["qty".into()]).unwrap(); + let nb = g + .add_node(b_prog, b_entry, vec!["subtotal".into()]) + .unwrap(); + g.set_input_value(na, "qty", Value::I64(2)).unwrap(); + g.connect(na, "subtotal", nb, "subtotal").unwrap(); let dot = g.to_dot(); @@ -220,24 +221,6 @@ mod tests { assert!(dot.contains("n0:out0 -> n1:in0;")); } - #[test] - fn to_dot_marks_unknown_upstream_outputs_with_dashed_edges() { - let mut g = ExecutionGraph::new(HostNoop, Limits::default()); - let (a_prog, a_entry) = make_identity_program("subtotal"); - let (b_prog, b_entry) = make_identity_program("total"); - - let na = g.add_node(a_prog, a_entry, vec!["qty".into()]); - let nb = g.add_node(b_prog, b_entry, vec!["subtotal".into()]); - g.set_input_value(na, "qty", Value::I64(2)); - g.connect(na, "missing", nb, "subtotal"); - - let dot = g.to_dot(); - - assert!( - dot.contains("n0 -> n1:in0 [label=\"missing\", style=dashed, color=\"firebrick\"];") - ); - } - #[test] fn to_dot_includes_program_and_function_names_when_available() { let mut pb = ProgramBuilder::new(); @@ -258,8 +241,8 @@ mod tests { let prog = Arc::new(pb.build_verified().unwrap()); let mut g = ExecutionGraph::new(HostNoop, Limits::default()); - let n = g.add_node(prog, f, vec!["x".into()]); - g.set_input_value(n, "x", Value::I64(1)); + let n = g.add_node(prog, f, vec!["x".into()]).unwrap(); + g.set_input_value(n, "x", Value::I64(1)).unwrap(); let dot = g.to_dot(); assert!(dot.contains("node#0 (named_program)")); diff --git a/execution_graph_examples/src/bin/tax.rs b/execution_graph_examples/src/bin/tax.rs index 3d1e0d9..2916d5b 100644 --- a/execution_graph_examples/src/bin/tax.rs +++ b/execution_graph_examples/src/bin/tax.rs @@ -172,9 +172,9 @@ fn main() -> Result<(), GraphError> { let (t_prog, t_entry) = program_tax_amount(); let (sum_prog, sum_entry) = program_total(); - let n_price = g.add_node(p_prog, p_entry, vec!["qty".into(), "unit_price".into()]); - let n_tax = g.add_node(t_prog, t_entry, vec!["subtotal".into()]); - let n_total = g.add_node(sum_prog, sum_entry, vec!["subtotal".into(), "tax".into()]); + let n_price = g.add_node(p_prog, p_entry, vec!["qty".into(), "unit_price".into()])?; + let n_tax = g.add_node(t_prog, t_entry, vec!["subtotal".into()])?; + let n_total = g.add_node(sum_prog, sum_entry, vec!["subtotal".into(), "tax".into()])?; let node_labels: BTreeMap = BTreeMap::from([ (n_price.as_u64(), label_for(n_price)), @@ -182,12 +182,12 @@ fn main() -> Result<(), GraphError> { (n_total.as_u64(), label_for(n_total)), ]); - g.set_input_value(n_price, "qty", Value::I64(2)); - g.set_input_value(n_price, "unit_price", Value::I64(120)); + g.set_input_value(n_price, "qty", Value::I64(2))?; + g.set_input_value(n_price, "unit_price", Value::I64(120))?; - g.connect(n_price, "subtotal", n_tax, "subtotal"); - g.connect(n_price, "subtotal", n_total, "subtotal"); - g.connect(n_tax, "tax", n_total, "tax"); + g.connect(n_price, "subtotal", n_tax, "subtotal")?; + g.connect(n_price, "subtotal", n_total, "subtotal")?; + g.connect(n_tax, "tax", n_total, "tax")?; if emit_dot { g.run_all()?; @@ -205,7 +205,7 @@ fn main() -> Result<(), GraphError> { // Change 1: the order quantity changes. println!(); println!("🟦 change: qty"); - g.set_input_value(n_price, "qty", Value::I64(3)); + g.set_input_value(n_price, "qty", Value::I64(3))?; g.invalidate_input("qty"); let report = g.run_all_with_report(ReportDetailMask::FULL)?; diff --git a/execution_graph_wind_tunnel/benches/graph.rs b/execution_graph_wind_tunnel/benches/graph.rs index 6019d34..4038f4a 100644 --- a/execution_graph_wind_tunnel/benches/graph.rs +++ b/execution_graph_wind_tunnel/benches/graph.rs @@ -72,13 +72,13 @@ fn build_chain_graph(len: usize) -> (ExecutionGraph, execution_graph::N let (prog, entry) = build_identity_program("value"); let mut g = ExecutionGraph::new(NopHost, Limits::default()); - let n0 = g.add_node(prog.clone(), entry, vec!["in".into()]); - g.set_input_value(n0, "in", Value::I64(1)); + let n0 = g.add_node(prog.clone(), entry, vec!["in".into()]).unwrap(); + g.set_input_value(n0, "in", Value::I64(1)).unwrap(); let mut prev = n0; for _ in 1..len { - let n = g.add_node(prog.clone(), entry, vec!["x".into()]); - g.connect(prev, "value", n, "x"); + let n = g.add_node(prog.clone(), entry, vec!["x".into()]).unwrap(); + g.connect(prev, "value", n, "x").unwrap(); prev = n; } @@ -115,11 +115,12 @@ fn build_stable_deps_graph(input_count: usize) -> ExecutionGraph { let input_names: Vec> = (0..input_count) .map(|i| format!("in{i}").into_boxed_str()) .collect(); - let node = g.add_node(prog, entry, input_names.clone()); + let node = g.add_node(prog, entry, input_names.clone()).unwrap(); for (i, name) in input_names.iter().enumerate() { let value = i64::try_from(i).unwrap_or(i64::MAX); - g.set_input_value(node, name.as_ref(), Value::I64(value)); + g.set_input_value(node, name.as_ref(), Value::I64(value)) + .unwrap(); } g.run_all().unwrap(); @@ -140,15 +141,16 @@ fn build_stable_deps_graph(input_count: usize) -> ExecutionGraph { fn bench_single_node_rerun(c: &mut Criterion) { let (prog, entry) = build_identity_program("value"); let mut g = ExecutionGraph::new(NopHost, Limits::default()); - let n0 = g.add_node(prog, entry, vec!["in".into()]); - g.set_input_value(n0, "in", Value::I64(0)); + let n0 = g.add_node(prog, entry, vec!["in".into()]).unwrap(); + g.set_input_value(n0, "in", Value::I64(0)).unwrap(); g.run_all().unwrap(); c.bench_function("single_node_rerun", |b| { let mut v = 0_i64; b.iter(|| { v = v.wrapping_add(1); - g.set_input_value(n0, "in", Value::I64(black_box(v))); + g.set_input_value(n0, "in", Value::I64(black_box(v))) + .unwrap(); g.invalidate_input("in"); g.run_all().unwrap(); }); @@ -173,7 +175,8 @@ fn bench_chain_rerun(c: &mut Criterion) { let mut v = 0_i64; b.iter(|| { v = v.wrapping_add(1); - g.set_input_value(n0, "in", Value::I64(black_box(v))); + g.set_input_value(n0, "in", Value::I64(black_box(v))) + .unwrap(); g.invalidate_input("in"); g.run_all().unwrap(); }); @@ -316,7 +319,7 @@ fn build_stable_deps_flapping_order_graph( flip: Rc::new(RefCell::new(false)), }; let mut g = ExecutionGraph::new(host, Limits::default()); - g.add_node(prog, entry, vec![]); + g.add_node(prog, entry, vec![]).unwrap(); g.run_all().unwrap(); (g, op) } @@ -347,12 +350,12 @@ fn build_fanout_graph(fanout: usize) -> (ExecutionGraph, execution_grap let (prog, entry) = build_identity_program("value"); let mut g = ExecutionGraph::new(NopHost, Limits::default()); - let root = g.add_node(prog.clone(), entry, vec!["in".into()]); - g.set_input_value(root, "in", Value::I64(1)); + let root = g.add_node(prog.clone(), entry, vec!["in".into()]).unwrap(); + g.set_input_value(root, "in", Value::I64(1)).unwrap(); for _ in 0..fanout { - let leaf = g.add_node(prog.clone(), entry, vec!["x".into()]); - g.connect(root, "value", leaf, "x"); + let leaf = g.add_node(prog.clone(), entry, vec!["x".into()]).unwrap(); + g.connect(root, "value", leaf, "x").unwrap(); } g.run_all().unwrap(); @@ -379,7 +382,8 @@ fn bench_fanout_rerun(c: &mut Criterion) { let mut v = 0_i64; b.iter(|| { v = v.wrapping_add(1); - g.set_input_value(root, "in", Value::I64(black_box(v))); + g.set_input_value(root, "in", Value::I64(black_box(v))) + .unwrap(); g.invalidate_input("in"); g.run_all().unwrap(); }); @@ -468,17 +472,22 @@ fn build_disjoint_chains( let (id_prog, id_entry) = build_identity_program("value"); for i in 0..chains { - let root = g.add_node(param_prog.clone(), param_entry, vec!["key".into()]); + let root = g + .add_node(param_prog.clone(), param_entry, vec!["key".into()]) + .unwrap(); g.set_input_value( root, "key", Value::U64(u64::try_from(i).unwrap_or(u64::MAX)), - ); + ) + .unwrap(); let mut prev = root; for _ in 1..chain_len { - let n = g.add_node(id_prog.clone(), id_entry, vec!["x".into()]); - g.connect(prev, "value", n, "x"); + let n = g + .add_node(id_prog.clone(), id_entry, vec!["x".into()]) + .unwrap(); + g.connect(prev, "value", n, "x").unwrap(); prev = n; } } @@ -567,25 +576,34 @@ fn build_shared_upstream( let (add_prog, add_entry) = build_add2_program("value"); let (id_prog, id_entry) = build_identity_program("value"); - let global = g.add_node(param_prog.clone(), param_entry, vec!["key".into()]); - g.set_input_value(global, "key", Value::U64(0)); + let global = g + .add_node(param_prog.clone(), param_entry, vec!["key".into()]) + .unwrap(); + g.set_input_value(global, "key", Value::U64(0)).unwrap(); for i in 0..tenants { - let per = g.add_node(param_prog.clone(), param_entry, vec!["key".into()]); + let per = g + .add_node(param_prog.clone(), param_entry, vec!["key".into()]) + .unwrap(); g.set_input_value( per, "key", Value::U64(u64::try_from(i + 1).unwrap_or(u64::MAX)), - ); + ) + .unwrap(); - let base = g.add_node(add_prog.clone(), add_entry, vec!["a".into(), "b".into()]); - g.connect(global, "value", base, "a"); - g.connect(per, "value", base, "b"); + let base = g + .add_node(add_prog.clone(), add_entry, vec!["a".into(), "b".into()]) + .unwrap(); + g.connect(global, "value", base, "a").unwrap(); + g.connect(per, "value", base, "b").unwrap(); let mut prev = base; for _ in 1..chain_len { - let n = g.add_node(id_prog.clone(), id_entry, vec!["x".into()]); - g.connect(prev, "value", n, "x"); + let n = g + .add_node(id_prog.clone(), id_entry, vec!["x".into()]) + .unwrap(); + g.connect(prev, "value", n, "x").unwrap(); prev = n; } } @@ -687,19 +705,24 @@ fn build_layered_dag( let mut prev: Vec = Vec::with_capacity(width); for i in 0..width { - let n = g.add_node(param_prog.clone(), param_entry, vec!["key".into()]); - g.set_input_value(n, "key", Value::U64(u64::try_from(i).unwrap_or(u64::MAX))); + let n = g + .add_node(param_prog.clone(), param_entry, vec!["key".into()]) + .unwrap(); + g.set_input_value(n, "key", Value::U64(u64::try_from(i).unwrap_or(u64::MAX))) + .unwrap(); prev.push(n); } for _ in 1..layers { let mut next: Vec = Vec::with_capacity(width); for i in 0..width { - let n = g.add_node(add_prog.clone(), add_entry, vec!["a".into(), "b".into()]); + let n = g + .add_node(add_prog.clone(), add_entry, vec!["a".into(), "b".into()]) + .unwrap(); let a0 = prev[i]; let b0 = prev[(i + 1) % width]; - g.connect(a0, "value", n, "a"); - g.connect(b0, "value", n, "b"); + g.connect(a0, "value", n, "a").unwrap(); + g.connect(b0, "value", n, "b").unwrap(); next.push(n); } prev = next;