Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.kakao.actionbase.pipeline.dsl

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule}
import org.apache.spark.sql.SparkSession

import scala.reflect.ClassTag

/** One spark-submit unit. Subclass and implement `plan(cfg)`.
*
* Two entry points share the same Cfg-binding mapper:
* - `main(argv)` — for spark-submit / CLI: parses `--key=value` argv into Cfg.
* - `planFromMap(args)` — for in-process runners: binds an arbitrary YAML-shaped Map onto Cfg, supporting nested
* fields like `Seq[StepSpec]` that argv cannot represent.
*
* {{{
* case class MyConfig(in: String, out: String)
*
* object MyJob extends Job[MyConfig] {
* override def plan(cfg: MyConfig): Plan.Closed =
* FileSource(cfg.in, "parquet") ~> MyTransform() ~> FileSink(cfg.out, "parquet")
* }
* }}}
*/
abstract class Job[C <: Product: ClassTag] {

def plan(cfg: C): Plan.Closed

/** Hook for subclasses to tweak the SparkSession builder (master, configs). */
protected def configure(builder: SparkSession.Builder): SparkSession.Builder = builder

/** Bind `args` onto Cfg via Jackson and return the resulting Plan. Used by runners that already manage Spark. */
def planFromMap(args: Map[String, Any]): Plan.Closed = {
val cls = implicitly[ClassTag[C]].runtimeClass.asInstanceOf[Class[C]]
val cfg = Job.mapper.convertValue(args, cls)
plan(cfg)
}

def main(argv: Array[String]): Unit = {
val args = Job.parseArgv(argv)
println(s"Running ${getClass.getSimpleName}: $args")

val spark = configure(
SparkSession.builder().appName(getClass.getCanonicalName.stripSuffix("$"))
).getOrCreate()

try planFromMap(args).run()(spark)
finally {
println("Stopping Spark session...")
spark.stop()
}
}
}

object Job {

// Strict mapper: missing required primitives must throw rather than silently
// become 0/false. Unknown keys are tolerated so `--extra=...` doesn't fail.
@transient private[pipeline] lazy val mapper: ObjectMapper with ClassTagExtensions = {
val m = new ObjectMapper() with ClassTagExtensions
m.registerModule(DefaultScalaModule)
m.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
m.configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, true)
m
}

private def parseArgv(argv: Array[String]): Map[String, Any] =
argv.iterator.flatMap { s =>
if (!s.startsWith("--")) None
else
s.drop(2).split("=", 2) match {
case Array(k, v) => Some(k -> v)
case _ => None
}
}.toMap
}
132 changes: 132 additions & 0 deletions pipeline/src/main/scala/com/kakao/actionbase/pipeline/dsl/Plan.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package com.kakao.actionbase.pipeline.dsl

import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.collection.mutable

// Type-state wrapper around the internal Ast.
// Open = chain ends in a Source or Transform; not yet runnable.
// Closed = chain ends in a Sink (or Fork of Sinks); runnable.
sealed trait Plan {
private[dsl] def ast: Ast
}

object Plan {

final class Open private[dsl] (
private[dsl] val ast: Ast,
private[dsl] val edgeLabel: String = "_0"
) extends Plan {

/** Name this step's output for the consumer's view of it (e.g., the temp view name a `SqlTransform` will see).
* Defaults to `"_0"` if not called.
*/
def as(label: String): Open = new Open(ast, label)

def ~>(t: Transform): Open = new Open(Ast.Tx(Seq(edgeLabel -> ast), t))

def ~>(s: Sink): Closed = new Closed(Ast.Snk(ast, s))

/** Combine with another labeled output to feed a multi-input Transform:
*
* `(users.as("u") + events.as("e")) ~> SqlTransform("... FROM u JOIN e ...")`.
*/
def +(other: Open): MultiOpen =
new MultiOpen(Seq(edgeLabel -> ast, other.edgeLabel -> other.ast))

def fanOut(branches: (Open => Closed)*): Closed = {
require(branches.nonEmpty, "fanOut requires at least one branch")
val branchAsts = branches.map { b =>
val branchOpen = new Open(Ast.Ref)
b(branchOpen).ast
}
new Closed(Ast.Fork(ast, branchAsts))
}
}

/** Two or more labeled outputs combined for a multi-input Transform. Build via `Open.+`; finish with `~>`. */
final class MultiOpen private[dsl] (private[dsl] val parts: Seq[(String, Ast)]) {
def +(other: Open): MultiOpen = new MultiOpen(parts :+ (other.edgeLabel -> other.ast))
def ~>(t: Transform): Open = new Open(Ast.Tx(parts, t))
}

final class Closed private[dsl] (private[dsl] val ast: Ast) extends Plan {
def run()(implicit spark: SparkSession): Unit = Executor.run(ast)
}

/** Escape hatch for runners that build the AST directly (e.g., `StepsBuilder` assembling a DAG from YAML). */
private[pipeline] def closed(ast: Ast): Closed = new Closed(ast)
private[pipeline] def open(ast: Ast): Open = new Open(ast)
}

// Internal AST. Hidden from users so the DSL surface stays small.
private[pipeline] sealed trait Ast
private[pipeline] object Ast {
case class Src(s: Source) extends Ast
case class Tx(upstreams: Seq[(String, Ast)], t: Transform) extends Ast
case class Snk(upstream: Ast, s: Sink) extends Ast
case class Fork(upstream: Ast, branches: Seq[Ast]) extends Ast
// Multiple terminal roots executed under a shared materialization memo, so a common upstream is built once even
// when several sinks consume it.
case class Group(roots: Seq[Ast]) extends Ast
case object Ref extends Ast
}

private[dsl] object Executor {

def run(ast: Ast)(implicit spark: SparkSession): Unit = {
val memo = mutable.Map.empty[Ast, DataFrame]
runRoot(ast, memo)
}

private def runRoot(ast: Ast, memo: mutable.Map[Ast, DataFrame])(implicit
spark: SparkSession
): Unit = ast match {
case Ast.Snk(up, sink) =>
sink.write(materialize(up, memo))

case Ast.Fork(up, branches) =>
val df = materialize(up, memo).cache()
try branches.foreach(b => runBranch(b, df))
finally df.unpersist(blocking = false)

case Ast.Group(roots) =>
roots.foreach(r => runRoot(r, memo))

case other =>
throw new IllegalStateException(
s"Top-level Plan must end in a Sink, Fork, or Group, got: $other"
)
}

private def materialize(ast: Ast, memo: mutable.Map[Ast, DataFrame])(implicit
spark: SparkSession
): DataFrame = memo.getOrElseUpdate(
ast,
ast match {
case Ast.Src(s) => s.read()
case Ast.Tx(ups, t) =>
t.apply(ups.map { case (label, up) => label -> materialize(up, memo) })
case Ast.Ref =>
throw new IllegalStateException("Ast.Ref outside fanOut branch — invalid Plan")
case other =>
throw new IllegalStateException(s"materialize cannot handle: $other")
}
)

private def runBranch(ast: Ast, root: DataFrame)(implicit spark: SparkSession): Unit = {
def go(p: Ast): DataFrame = p match {
case Ast.Ref => root
case Ast.Tx(ups, t) => t.apply(ups.map { case (label, up) => label -> go(up) })
case other =>
throw new IllegalStateException(
s"fanOut branch must be Transform*-then-Sink rooted at the fork, got: $other"
)
}
ast match {
case Ast.Snk(up, sink) => sink.write(go(up))
case other =>
throw new IllegalStateException(s"fanOut branch must end in Sink, got: $other")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.kakao.actionbase.pipeline.dsl

import org.apache.spark.sql.{DataFrame, SparkSession}

sealed trait Step

trait Source extends Step {
def read()(implicit spark: SparkSession): DataFrame
}

/** A Transform consumes one or more labeled input DataFrames and produces one. Each input arrives as a `(label, df)`
* pair so consumers (e.g., `SqlTransform`) can use the label as a temp view name without re-stating it. The label is
* the producer's `as:` (or `"_0"` for a single-input chain default).
*/
trait Transform extends Step {
def apply(inputs: Seq[(String, DataFrame)])(implicit spark: SparkSession): DataFrame

/** Single-input convenience for DSL `~>` and unit tests. The implicit label is `"_0"`. */
def apply(in: DataFrame)(implicit spark: SparkSession): DataFrame = apply(Seq("_0" -> in))
}

/** A Sink consumes a single DataFrame. Multi-input fan-in isn't a Sink concern — wire multiple sinks as siblings via
* `fanOut(...)` (Scala DSL) or repeat the sink step under different `inputs:` (YAML).
*/
trait Sink extends Step {
def write(in: DataFrame)(implicit spark: SparkSession): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.kakao.actionbase.pipeline

import scala.language.implicitConversions

package object dsl {
// Lets a Source start a chain: `MySource(...) ~> MyTransform() ~> MySink(...)`.
// `~>` lives on Plan.Open, so this conversion bridges the gap.
implicit def sourceToOpen(s: Source): Plan.Open =
new Plan.Open(Ast.Src(s))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.kakao.actionbase.pipeline.jobs

import com.kakao.actionbase.pipeline.dsl.{Job, Plan}
import com.kakao.actionbase.pipeline.runner.StepsBuilder
import com.kakao.actionbase.pipeline.workflow.StepSpec

/** Generic Job whose Plan is built at runtime from an inline `steps:` list in its Cfg. Lets a workflow YAML express a
* Source ~> Transform* ~> Sink chain without requiring a hand-written Job class:
*
* {{{
* jobs:
* pi:
* kind: spark
* artifact: "com.kakao.actionbase:pipeline:0.x"
* mainClass: StepsRunnerJob
* args:
* steps:
* - step: SampleSource
* args: { n: 1000000, columns: [x, y] }
* - step: SqlTransform
* args: { query: "SELECT ... FROM _0" }
* - step: ShowSink
* }}}
*/
case class StepsRunnerCfg(steps: Seq[StepSpec])

object StepsRunnerJob extends Job[StepsRunnerCfg] {
override def plan(cfg: StepsRunnerCfg): Plan.Closed = StepsBuilder.build(cfg.steps)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.kakao.actionbase.pipeline.runner

/** Resolves a Job or Step class name from a workflow YAML. The name may be:
* - a full FQN (`com.kakao.actionbase.pipeline.jobs.SparkPiJob`),
* - a short name (`SparkPiJob`), resolved relative to one of `roots`,
* - or a sub-package name (`subdir.MySource`), also resolved relative to a root.
*
* Roots are tried in order; first match wins.
*/
object ClassResolver {

val JobRoots: Seq[String] = Seq("com.kakao.actionbase.pipeline.jobs")

val StepRoots: Seq[String] = Seq(
"com.kakao.actionbase.pipeline.steps.source",
"com.kakao.actionbase.pipeline.steps.transform",
"com.kakao.actionbase.pipeline.steps.sink"
)

def resolve(name: String, roots: Seq[String]): Class[_] = {
val candidates = name +: roots.map(r => s"$r.$name")
candidates.view.flatMap(tryLoad).headOption.getOrElse {
throw new ClassNotFoundException(
s"Cannot resolve '$name' as a full FQN or under any of: ${roots.mkString(", ")}"
)
}
}

private def tryLoad(fqn: String): Option[Class[_]] =
try Some(Class.forName(fqn))
catch { case _: ClassNotFoundException => None }
}
Loading
Loading