diff --git a/pipeline/src/main/scala/com/kakao/actionbase/pipeline/dsl/Job.scala b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/dsl/Job.scala new file mode 100644 index 00000000..f4fd9d88 --- /dev/null +++ b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/dsl/Job.scala @@ -0,0 +1,76 @@ +package com.kakao.actionbase.pipeline.dsl + +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule} +import org.apache.spark.sql.SparkSession + +import scala.reflect.ClassTag + +/** One spark-submit unit. Subclass and implement `plan(cfg)`. + * + * Two entry points share the same Cfg-binding mapper: + * - `main(argv)` — for spark-submit / CLI: parses `--key=value` argv into Cfg. + * - `planFromMap(args)` — for in-process runners: binds an arbitrary YAML-shaped Map onto Cfg, supporting nested + * fields like `Seq[StepSpec]` that argv cannot represent. + * + * {{{ + * case class MyConfig(in: String, out: String) + * + * object MyJob extends Job[MyConfig] { + * override def plan(cfg: MyConfig): Plan.Closed = + * FileSource(cfg.in, "parquet") ~> MyTransform() ~> FileSink(cfg.out, "parquet") + * } + * }}} + */ +abstract class Job[C <: Product: ClassTag] { + + def plan(cfg: C): Plan.Closed + + /** Hook for subclasses to tweak the SparkSession builder (master, configs). */ + protected def configure(builder: SparkSession.Builder): SparkSession.Builder = builder + + /** Bind `args` onto Cfg via Jackson and return the resulting Plan. Used by runners that already manage Spark. */ + def planFromMap(args: Map[String, Any]): Plan.Closed = { + val cls = implicitly[ClassTag[C]].runtimeClass.asInstanceOf[Class[C]] + val cfg = Job.mapper.convertValue(args, cls) + plan(cfg) + } + + def main(argv: Array[String]): Unit = { + val args = Job.parseArgv(argv) + println(s"Running ${getClass.getSimpleName}: $args") + + val spark = configure( + SparkSession.builder().appName(getClass.getCanonicalName.stripSuffix("$")) + ).getOrCreate() + + try planFromMap(args).run()(spark) + finally { + println("Stopping Spark session...") + spark.stop() + } + } +} + +object Job { + + // Strict mapper: missing required primitives must throw rather than silently + // become 0/false. Unknown keys are tolerated so `--extra=...` doesn't fail. + @transient private[pipeline] lazy val mapper: ObjectMapper with ClassTagExtensions = { + val m = new ObjectMapper() with ClassTagExtensions + m.registerModule(DefaultScalaModule) + m.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + m.configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, true) + m + } + + private def parseArgv(argv: Array[String]): Map[String, Any] = + argv.iterator.flatMap { s => + if (!s.startsWith("--")) None + else + s.drop(2).split("=", 2) match { + case Array(k, v) => Some(k -> v) + case _ => None + } + }.toMap +} diff --git a/pipeline/src/main/scala/com/kakao/actionbase/pipeline/dsl/Plan.scala b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/dsl/Plan.scala new file mode 100644 index 00000000..b60ba166 --- /dev/null +++ b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/dsl/Plan.scala @@ -0,0 +1,132 @@ +package com.kakao.actionbase.pipeline.dsl + +import org.apache.spark.sql.{DataFrame, SparkSession} + +import scala.collection.mutable + +// Type-state wrapper around the internal Ast. +// Open = chain ends in a Source or Transform; not yet runnable. +// Closed = chain ends in a Sink (or Fork of Sinks); runnable. +sealed trait Plan { + private[dsl] def ast: Ast +} + +object Plan { + + final class Open private[dsl] ( + private[dsl] val ast: Ast, + private[dsl] val edgeLabel: String = "_0" + ) extends Plan { + + /** Name this step's output for the consumer's view of it (e.g., the temp view name a `SqlTransform` will see). + * Defaults to `"_0"` if not called. + */ + def as(label: String): Open = new Open(ast, label) + + def ~>(t: Transform): Open = new Open(Ast.Tx(Seq(edgeLabel -> ast), t)) + + def ~>(s: Sink): Closed = new Closed(Ast.Snk(ast, s)) + + /** Combine with another labeled output to feed a multi-input Transform: + * + * `(users.as("u") + events.as("e")) ~> SqlTransform("... FROM u JOIN e ...")`. + */ + def +(other: Open): MultiOpen = + new MultiOpen(Seq(edgeLabel -> ast, other.edgeLabel -> other.ast)) + + def fanOut(branches: (Open => Closed)*): Closed = { + require(branches.nonEmpty, "fanOut requires at least one branch") + val branchAsts = branches.map { b => + val branchOpen = new Open(Ast.Ref) + b(branchOpen).ast + } + new Closed(Ast.Fork(ast, branchAsts)) + } + } + + /** Two or more labeled outputs combined for a multi-input Transform. Build via `Open.+`; finish with `~>`. */ + final class MultiOpen private[dsl] (private[dsl] val parts: Seq[(String, Ast)]) { + def +(other: Open): MultiOpen = new MultiOpen(parts :+ (other.edgeLabel -> other.ast)) + def ~>(t: Transform): Open = new Open(Ast.Tx(parts, t)) + } + + final class Closed private[dsl] (private[dsl] val ast: Ast) extends Plan { + def run()(implicit spark: SparkSession): Unit = Executor.run(ast) + } + + /** Escape hatch for runners that build the AST directly (e.g., `StepsBuilder` assembling a DAG from YAML). */ + private[pipeline] def closed(ast: Ast): Closed = new Closed(ast) + private[pipeline] def open(ast: Ast): Open = new Open(ast) +} + +// Internal AST. Hidden from users so the DSL surface stays small. +private[pipeline] sealed trait Ast +private[pipeline] object Ast { + case class Src(s: Source) extends Ast + case class Tx(upstreams: Seq[(String, Ast)], t: Transform) extends Ast + case class Snk(upstream: Ast, s: Sink) extends Ast + case class Fork(upstream: Ast, branches: Seq[Ast]) extends Ast + // Multiple terminal roots executed under a shared materialization memo, so a common upstream is built once even + // when several sinks consume it. + case class Group(roots: Seq[Ast]) extends Ast + case object Ref extends Ast +} + +private[dsl] object Executor { + + def run(ast: Ast)(implicit spark: SparkSession): Unit = { + val memo = mutable.Map.empty[Ast, DataFrame] + runRoot(ast, memo) + } + + private def runRoot(ast: Ast, memo: mutable.Map[Ast, DataFrame])(implicit + spark: SparkSession + ): Unit = ast match { + case Ast.Snk(up, sink) => + sink.write(materialize(up, memo)) + + case Ast.Fork(up, branches) => + val df = materialize(up, memo).cache() + try branches.foreach(b => runBranch(b, df)) + finally df.unpersist(blocking = false) + + case Ast.Group(roots) => + roots.foreach(r => runRoot(r, memo)) + + case other => + throw new IllegalStateException( + s"Top-level Plan must end in a Sink, Fork, or Group, got: $other" + ) + } + + private def materialize(ast: Ast, memo: mutable.Map[Ast, DataFrame])(implicit + spark: SparkSession + ): DataFrame = memo.getOrElseUpdate( + ast, + ast match { + case Ast.Src(s) => s.read() + case Ast.Tx(ups, t) => + t.apply(ups.map { case (label, up) => label -> materialize(up, memo) }) + case Ast.Ref => + throw new IllegalStateException("Ast.Ref outside fanOut branch — invalid Plan") + case other => + throw new IllegalStateException(s"materialize cannot handle: $other") + } + ) + + private def runBranch(ast: Ast, root: DataFrame)(implicit spark: SparkSession): Unit = { + def go(p: Ast): DataFrame = p match { + case Ast.Ref => root + case Ast.Tx(ups, t) => t.apply(ups.map { case (label, up) => label -> go(up) }) + case other => + throw new IllegalStateException( + s"fanOut branch must be Transform*-then-Sink rooted at the fork, got: $other" + ) + } + ast match { + case Ast.Snk(up, sink) => sink.write(go(up)) + case other => + throw new IllegalStateException(s"fanOut branch must end in Sink, got: $other") + } + } +} diff --git a/pipeline/src/main/scala/com/kakao/actionbase/pipeline/dsl/Step.scala b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/dsl/Step.scala new file mode 100644 index 00000000..8f870eba --- /dev/null +++ b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/dsl/Step.scala @@ -0,0 +1,27 @@ +package com.kakao.actionbase.pipeline.dsl + +import org.apache.spark.sql.{DataFrame, SparkSession} + +sealed trait Step + +trait Source extends Step { + def read()(implicit spark: SparkSession): DataFrame +} + +/** A Transform consumes one or more labeled input DataFrames and produces one. Each input arrives as a `(label, df)` + * pair so consumers (e.g., `SqlTransform`) can use the label as a temp view name without re-stating it. The label is + * the producer's `as:` (or `"_0"` for a single-input chain default). + */ +trait Transform extends Step { + def apply(inputs: Seq[(String, DataFrame)])(implicit spark: SparkSession): DataFrame + + /** Single-input convenience for DSL `~>` and unit tests. The implicit label is `"_0"`. */ + def apply(in: DataFrame)(implicit spark: SparkSession): DataFrame = apply(Seq("_0" -> in)) +} + +/** A Sink consumes a single DataFrame. Multi-input fan-in isn't a Sink concern — wire multiple sinks as siblings via + * `fanOut(...)` (Scala DSL) or repeat the sink step under different `inputs:` (YAML). + */ +trait Sink extends Step { + def write(in: DataFrame)(implicit spark: SparkSession): Unit +} diff --git a/pipeline/src/main/scala/com/kakao/actionbase/pipeline/dsl/package.scala b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/dsl/package.scala new file mode 100644 index 00000000..6ede19d1 --- /dev/null +++ b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/dsl/package.scala @@ -0,0 +1,10 @@ +package com.kakao.actionbase.pipeline + +import scala.language.implicitConversions + +package object dsl { + // Lets a Source start a chain: `MySource(...) ~> MyTransform() ~> MySink(...)`. + // `~>` lives on Plan.Open, so this conversion bridges the gap. + implicit def sourceToOpen(s: Source): Plan.Open = + new Plan.Open(Ast.Src(s)) +} diff --git a/pipeline/src/main/scala/com/kakao/actionbase/pipeline/jobs/StepsRunnerJob.scala b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/jobs/StepsRunnerJob.scala new file mode 100644 index 00000000..1cea0238 --- /dev/null +++ b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/jobs/StepsRunnerJob.scala @@ -0,0 +1,29 @@ +package com.kakao.actionbase.pipeline.jobs + +import com.kakao.actionbase.pipeline.dsl.{Job, Plan} +import com.kakao.actionbase.pipeline.runner.StepsBuilder +import com.kakao.actionbase.pipeline.workflow.StepSpec + +/** Generic Job whose Plan is built at runtime from an inline `steps:` list in its Cfg. Lets a workflow YAML express a + * Source ~> Transform* ~> Sink chain without requiring a hand-written Job class: + * + * {{{ + * jobs: + * pi: + * kind: spark + * artifact: "com.kakao.actionbase:pipeline:0.x" + * mainClass: StepsRunnerJob + * args: + * steps: + * - step: SampleSource + * args: { n: 1000000, columns: [x, y] } + * - step: SqlTransform + * args: { query: "SELECT ... FROM _0" } + * - step: ShowSink + * }}} + */ +case class StepsRunnerCfg(steps: Seq[StepSpec]) + +object StepsRunnerJob extends Job[StepsRunnerCfg] { + override def plan(cfg: StepsRunnerCfg): Plan.Closed = StepsBuilder.build(cfg.steps) +} diff --git a/pipeline/src/main/scala/com/kakao/actionbase/pipeline/runner/ClassResolver.scala b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/runner/ClassResolver.scala new file mode 100644 index 00000000..3bec224f --- /dev/null +++ b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/runner/ClassResolver.scala @@ -0,0 +1,32 @@ +package com.kakao.actionbase.pipeline.runner + +/** Resolves a Job or Step class name from a workflow YAML. The name may be: + * - a full FQN (`com.kakao.actionbase.pipeline.jobs.SparkPiJob`), + * - a short name (`SparkPiJob`), resolved relative to one of `roots`, + * - or a sub-package name (`subdir.MySource`), also resolved relative to a root. + * + * Roots are tried in order; first match wins. + */ +object ClassResolver { + + val JobRoots: Seq[String] = Seq("com.kakao.actionbase.pipeline.jobs") + + val StepRoots: Seq[String] = Seq( + "com.kakao.actionbase.pipeline.steps.source", + "com.kakao.actionbase.pipeline.steps.transform", + "com.kakao.actionbase.pipeline.steps.sink" + ) + + def resolve(name: String, roots: Seq[String]): Class[_] = { + val candidates = name +: roots.map(r => s"$r.$name") + candidates.view.flatMap(tryLoad).headOption.getOrElse { + throw new ClassNotFoundException( + s"Cannot resolve '$name' as a full FQN or under any of: ${roots.mkString(", ")}" + ) + } + } + + private def tryLoad(fqn: String): Option[Class[_]] = + try Some(Class.forName(fqn)) + catch { case _: ClassNotFoundException => None } +} diff --git a/pipeline/src/main/scala/com/kakao/actionbase/pipeline/runner/Expression.scala b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/runner/Expression.scala new file mode 100644 index 00000000..04a99dfc --- /dev/null +++ b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/runner/Expression.scala @@ -0,0 +1,122 @@ +package com.kakao.actionbase.pipeline.runner + +import scala.collection.JavaConverters._ +import scala.util.matching.Regex + +/** Resolves `${{ ... }}` expressions in a parsed YAML tree against a context. + * + * Vocabulary: + * - `env.` — workflow `env:` value (string) + * - `needs..result` — upstream job result: `success` / `failure` / `skipped` / `cancelled` + * - `needs..outputs.` — value emitted by an upstream job + * - `presets.` — entry from the workflow `presets:` section (any value, often a map) + * - `load('')` — content of another YAML file at `` relative to the workflow file's dir + * + * An expression that fills the entire string value (e.g. `"${{ presets.X }}"`) is replaced by the raw evaluated value + * (which may be a map / list, not just a string). Otherwise expressions are stringified and substituted within the + * surrounding text. + */ +object Expression { + + /** Inputs the evaluator needs. `needs` is empty at YAML-load time and populated at runtime. `loadYaml` is used by the + * `load(...)` form; injected so test mocks don't have to touch the file system. + */ + case class Context( + env: Map[String, String] = Map.empty, + presets: Map[String, Any] = Map.empty, + needs: Map[String, NeedsView] = Map.empty, + loadYaml: String => Any = _ => sys.error("load(...) not configured") + ) + + case class NeedsView(result: String, outputs: Map[String, String] = Map.empty) + + private val Token: Regex = """\$\{\{\s*(.+?)\s*\}\}""".r + private val WholeToken: Regex = """\A\s*\$\{\{\s*(.+?)\s*\}\}\s*\z""".r + private val LoadCall: Regex = """\Aload\(\s*'([^']*)'\s*\)\z""".r + + /** Resolve every string leaf in a YAML-shaped tree. Map / List structure preserved. + * + * `lenient = true` preserves the original `${{ ... }}` token when an expression cannot be evaluated against the + * current context (typically: `needs.*` at load-time, or operator forms like `needs.X.result == 'success'` inside + * `when:` that this evaluator does not parse). At runtime the same tree is walked again with full context to finish + * those. + */ + def resolveDeep(value: Any, ctx: Context, lenient: Boolean = false): Any = value match { + case s: String => + s match { + case WholeToken(expr) => evaluate(expr, ctx, lenient) // raw value, may be map / list + case other => + Token.replaceAllIn(other, m => Regex.quoteReplacement(stringify(evaluate(m.group(1), ctx, lenient)))) + } + case m: java.util.Map[_, _] => + m.asInstanceOf[java.util.Map[String, Any]] + .asScala + .map { case (k, v) => k -> resolveDeep(v, ctx, lenient) } + .toMap + .asJava + case m: scala.collection.Map[_, _] => + m.asInstanceOf[scala.collection.Map[String, Any]].map { case (k, v) => k -> resolveDeep(v, ctx, lenient) }.toMap + case l: java.util.List[_] => + l.asInstanceOf[java.util.List[Any]].asScala.map(resolveDeep(_, ctx, lenient)).asJava + case l: scala.collection.Iterable[_] => + l.map(resolveDeep(_, ctx, lenient)).toSeq + case other => other + } + + /** Evaluate an expression body (without the outer `${{ }}`) and return the raw value. + * + * In lenient mode, an unresolvable expression (unknown key, unsupported syntax) returns the original `${{ ... }}` + * string unchanged so it can be evaluated later by a fuller context. + */ + def evaluate(expr: String, ctx: Context, lenient: Boolean = false): Any = { + try evaluateImpl(expr, ctx) + catch { + case _: NoSuchElementException if lenient => "${{ " + expr.trim + " }}" + case _: IllegalArgumentException if lenient => "${{ " + expr.trim + " }}" + } + } + + private def evaluateImpl(expr: String, ctx: Context): Any = { + val e = expr.trim + if (e.startsWith("env.")) { + val k = e.stripPrefix("env.") + ctx.env.getOrElse(k, throw new NoSuchElementException(s"unknown env key: $k")) + } else if (e.startsWith("presets.")) { + val k = e.stripPrefix("presets.") + ctx.presets.getOrElse(k, throw new NoSuchElementException(s"unknown preset: $k")) + } else if (e.startsWith("needs.")) { + evalNeeds(e.stripPrefix("needs."), ctx) + } else if (e.startsWith("load(")) { + e match { + case LoadCall(path) => ctx.loadYaml(path) + case _ => throw new IllegalArgumentException(s"malformed load(): $e — expected `load('path')`") + } + } else { + throw new IllegalArgumentException(s"unknown expression: $e") + } + } + + private def evalNeeds(rest: String, ctx: Context): Any = { + val parts = rest.split('.') + if (parts.length < 2) throw new IllegalArgumentException(s"malformed needs.* expression: needs.$rest") + val id = parts(0) + val view = ctx.needs.getOrElse(id, throw new NoSuchElementException(s"unknown needs id: $id")) + parts(1) match { + case "result" => view.result + case "outputs" => + if (parts.length != 3) + throw new IllegalArgumentException(s"needs.$id.outputs requires a key: needs.$id.outputs.") + view.outputs.getOrElse( + parts(2), + throw new NoSuchElementException(s"unknown output: needs.$id.outputs.${parts(2)}") + ) + case other => throw new IllegalArgumentException(s"unknown needs field: needs.$id.$other") + } + } + + private def stringify(value: Any): String = value match { + case null => "" + case s: String => s + case other => other.toString + } +} diff --git a/pipeline/src/main/scala/com/kakao/actionbase/pipeline/runner/ExtendsResolver.scala b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/runner/ExtendsResolver.scala new file mode 100644 index 00000000..c5c7f22d --- /dev/null +++ b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/runner/ExtendsResolver.scala @@ -0,0 +1,93 @@ +package com.kakao.actionbase.pipeline.runner + +import com.kakao.actionbase.pipeline.runner.Expression.Context + +import scala.collection.JavaConverters._ + +/** Resolves `$extends` directives in a parsed YAML tree. + * + * For every map containing `$extends:`: + * 1. Evaluate the value (must be a single `${{ ... }}` expression that resolves to a map). 2. Recursively resolve + * `$extends` inside both the extended map and the surrounding sibling keys. 3. Deep-merge: the extended map + * provides defaults; the sibling map's keys override. + * + * `$` keys are reserved as processor directives — only `$extends` is recognized today; unknown `$` keys are left + * untouched (forward-compatible). Cycles are rejected via a depth limit. + */ +object ExtendsResolver { + + private val ExtendsKey = "$extends" + private val MaxDepth = 32 + private val WholeToken = """\A\s*\$\{\{\s*(.+?)\s*\}\}\s*\z""".r + + def resolve(value: Any, ctx: Context): Any = walk(value, ctx, depth = 0) + + private def walk(value: Any, ctx: Context, depth: Int): Any = value match { + case m: java.util.Map[_, _] => + walkMap(m.asInstanceOf[java.util.Map[String, Any]].asScala.toMap, ctx, depth) + case m: scala.collection.Map[_, _] => + walkMap(m.asInstanceOf[scala.collection.Map[String, Any]].toMap, ctx, depth) + case l: java.util.List[_] => + l.asInstanceOf[java.util.List[Any]].asScala.map(walk(_, ctx, depth)).asJava + case l: scala.collection.Iterable[_] => + l.map(walk(_, ctx, depth)).toSeq + case other => other + } + + private def walkMap(m: Map[String, Any], ctx: Context, depth: Int): java.util.Map[String, Any] = { + if (depth > MaxDepth) throw new IllegalStateException(s"$$extends nesting exceeds $MaxDepth (cycle?)") + + m.get(ExtendsKey) match { + case None => + m.map { case (k, v) => k -> walk(v, ctx, depth) }.toMap.asJava + + case Some(expr) => + val incoming = evalExtendsValue(expr, ctx) + val resolvedIncoming = walk(incoming, ctx, depth + 1) match { + case jm: java.util.Map[_, _] => jm.asInstanceOf[java.util.Map[String, Any]].asScala.toMap + case other => + throw new IllegalArgumentException(s"$$extends must resolve to a map, got: ${other.getClass.getSimpleName}") + } + val sibling = (m - ExtendsKey).map { case (k, v) => k -> walk(v, ctx, depth) } + deepMerge(resolvedIncoming, sibling).asJava + } + } + + private def evalExtendsValue(expr: Any, ctx: Context): Any = expr match { + case s: String => + s.trim match { + case WholeToken(inner) => Expression.evaluate(inner, ctx) + case other => + throw new IllegalArgumentException( + s"$$extends value must be a single `\\$${{ ... }}` expression, got: $other" + ) + } + case other => + throw new IllegalArgumentException(s"$$extends value must be a string expression, got: $other") + } + + /** Recursive deep-merge: maps merge key-by-key (sibling wins on conflict at non-map leaves); other values are + * replaced wholesale by the sibling. + */ + private def deepMerge(base: Map[String, Any], over: Map[String, Any]): Map[String, Any] = { + val keys = base.keySet ++ over.keySet + keys.iterator.map { k => + (base.get(k), over.get(k)) match { + case (Some(a), Some(b)) => + k -> mergeValue(a, b) + case (Some(a), None) => k -> a + case (None, Some(b)) => k -> b + case _ => sys.error("unreachable") + } + }.toMap + } + + private def mergeValue(a: Any, b: Any): Any = (a, b) match { + case (am: java.util.Map[_, _], bm: java.util.Map[_, _]) => + deepMerge( + am.asInstanceOf[java.util.Map[String, Any]].asScala.toMap, + bm.asInstanceOf[java.util.Map[String, Any]].asScala.toMap + ).asJava + case _ => b + } +} diff --git a/pipeline/src/main/scala/com/kakao/actionbase/pipeline/runner/StepsBuilder.scala b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/runner/StepsBuilder.scala new file mode 100644 index 00000000..4fdd26d7 --- /dev/null +++ b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/runner/StepsBuilder.scala @@ -0,0 +1,126 @@ +package com.kakao.actionbase.pipeline.runner + +import com.kakao.actionbase.pipeline.dsl._ +import com.kakao.actionbase.pipeline.workflow.StepSpec + +import scala.collection.mutable + +/** Reflectively assembles a runnable `Plan.Closed` from a list of `StepSpec`s. Each spec's `args:` map is bound onto + * the Step's case-class fields via Jackson; the resulting AST is wired together as a DAG. + * + * Wiring rules: + * - Source steps stand alone (no upstream). + * - A step's upstream inputs are resolved from `inputs:` labels first; if absent, the previous step's output is used + * (linear-chain default). + * - `as: