Skip to content
Closed
158 changes: 138 additions & 20 deletions udf/worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,162 @@ Package structure for the UDF worker framework described in

## Overview

Spark processes a UDF by first obtaining a **WorkerDispatcher** from the worker
specification (plus context such as security scope). The dispatcher manages the
actual worker processes behind the scenes -- pooling, reuse, and termination are
all invisible to Spark.
Spark processes a UDF by obtaining a **WorkerDispatcher** from a worker
specification. The dispatcher manages workers behind the scenes. From
the dispatcher, Spark gets a **WorkerSession** -- one per UDF invocation --
with an Iterator-to-Iterator `process` API that streams input batches
through the worker and returns result batches.

From the dispatcher, Spark gets a **WorkerSession**, which represents one single
UDF execution and can carry per-execution state. A WorkerSession is not 1-to-1
mapped to an actual worker -- multiple sessions may share the same underlying
worker when it is reused. Worker reuse is managed by each dispatcher
implementation based on the worker specification.
```
UDFWorkerSpecification -- how to create and configure workers
|
v
WorkerDispatcher -- manages workers, creates sessions
|
v
WorkerSession -- one UDF execution
| 1. session.init(InitMessage(payload, inputSchema, outputSchema))
| 2. val results = session.process(inputBatches)
| 3. session.close()
```

How workers are created depends on the dispatcher implementation. The
framework currently provides **direct worker creation** (local OS
processes) and is designed for future **indirect creation** (via a
provisioning service or daemon).

## Sub-packages

```
udf/worker/
├── proto/ Protobuf definition of the worker specification
│ (UDFWorkerSpecification).
│ WorkerSpecification -- typed Scala wrapper around the protobuf spec.
└── core/ Engine-side APIs (all @Experimental):
WorkerDispatcher -- manages workers for one spec; creates sessions.
WorkerSession -- represents one single UDF execution.
WorkerSecurityScope -- security boundary for connection pooling.
├── proto/
│ worker_spec.proto -- UDFWorkerSpecification protobuf (+ generated Java classes)
│ common.proto -- shared enums (UDFWorkerDataFormat, etc.)
└── core/ -- abstract interfaces
WorkerDispatcher.scala -- creates sessions, manages worker lifecycle
WorkerSession.scala -- per-UDF init/process/cancel/close + InitMessage
WorkerConnection.scala -- transport channel abstraction
WorkerSecurityScope.scala -- security boundary for worker pooling
└── direct/ -- "direct" creation: local OS processes
DirectWorkerDispatcher.scala -- spawns processes, env lifecycle
DirectWorkerProcess.scala -- OS process + connection + UDS socket
DirectWorkerSession.scala -- session backed by a direct process
```

The `core/` package defines abstract interfaces that are independent of how
workers are created. The `core/direct/` sub-package implements "direct"
worker creation where Spark spawns local OS processes. Future packages
(e.g., `core/indirect/`) can implement alternative creation modes such as
obtaining workers from a provisioning service or daemon.

### Direct worker creation

`DirectWorkerDispatcher` spawns worker processes locally. On the first
session, it runs the optional environment lifecycle callables from the
`UDFWorkerSpecification`:

- **`environmentVerification`** -- checks if the environment is ready
(exit 0 = ready). When it succeeds, installation is skipped.
- **`installation`** -- prepares the environment (installs runtime,
dependencies, worker binaries). Only runs when verification is absent
or fails.
- **`environmentCleanup`** -- runs after the dispatcher is closed or on
JVM shutdown to clean up temporary resources.

Environment setup runs **once per dispatcher** (not per session).
Workers are terminated via SIGTERM/SIGKILL when the dispatcher is closed.

## Basic usage (Scala)

```scala
import org.apache.spark.udf.worker.{
DirectWorker, ProcessCallable, UDFProtoCommunicationPattern,
UDFWorkerDataFormat, UDFWorkerProperties, UDFWorkerSpecification,
UnixDomainSocket, WorkerCapabilities, WorkerConnectionSpec, WorkerEnvironment}
import org.apache.spark.udf.worker.core._

// 1. Define a worker spec (direct creation mode).
val spec = UDFWorkerSpecification.newBuilder()
.setEnvironment(WorkerEnvironment.newBuilder()
.setEnvironmentVerification(ProcessCallable.newBuilder()
.addCommand("python").addCommand("-c").addCommand("import my_udf_worker").build())
.setInstallation(ProcessCallable.newBuilder()
.addCommand("pip").addCommand("install").addCommand("my_udf_worker").build())
.build())
.setCapabilities(WorkerCapabilities.newBuilder()
.addSupportedDataFormats(UDFWorkerDataFormat.ARROW)
.addSupportedCommunicationPatterns(
UDFProtoCommunicationPattern.BIDIRECTIONAL_STREAMING)
.build())
.setDirect(DirectWorker.newBuilder()
.setRunner(ProcessCallable.newBuilder()
.addCommand("python").addCommand("-m").addCommand("my_udf_worker").build())
.setProperties(UDFWorkerProperties.newBuilder()
.setConnection(WorkerConnectionSpec.newBuilder()
.setUnixDomainSocket(UnixDomainSocket.getDefaultInstance).build())
.build())
.build())
.build()

// 2. Create a dispatcher. Use a protocol-specific subclass of
// DirectWorkerDispatcher (e.g., gRPC over UDS).
val dispatcher: WorkerDispatcher = ...

// 3. Create a session for one UDF execution.
val session = dispatcher.createSession(securityScope = None)
try {
// 4. Initialize with the serialized function and schemas.
session.init(InitMessage(
functionPayload = serializedFunction,
inputSchema = arrowInputSchema,
outputSchema = arrowOutputSchema))

// 5. Process data -- Iterator in, Iterator out.
val results: Iterator[Array[Byte]] =
session.process(inputBatches)

// Consume results lazily.
results.foreach(processResultBatch)
} finally {
session.close()
}

// 6. Shut down all workers.
dispatcher.close()
```

## Build

SBT:
```
build/sbt "udf-worker-core/compile"
build/sbt "udf-worker-core/test"
build/sbt "udf-worker-proto/compile" "udf-worker-core/compile"
```

Maven:
```
./build/mvn -pl udf/worker/proto,udf/worker/core -am compile
./build/mvn -pl udf/worker/proto,udf/worker/core -am test
build/mvn compile -pl udf/worker/proto,udf/worker/core -am
```

## Test

SBT:
```
build/sbt "udf-worker-core/test"
```

## Current status

This is the **first MVP** providing the core abstraction layer and the
direct worker dispatcher.
The following are left as TODOs:

- **Connection pooling** -- reuse workers across sessions
- **Security scope isolation** -- partition pools by `WorkerSecurityScope`
- **Indirect worker creation** -- obtain workers from a service or daemon
- **Protocol-specific implementations** -- e.g., gRPC over UDS

## Design references

* [SPIP Language-agnostic UDF Protocol for Spark](https://docs.google.com/document/d/19Whzq127QxVt2Luk0EClgaDtcpBsFUp67NcVdKKyPF8/edit?tab=t.0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.udf.worker
package org.apache.spark.udf.worker.core

import java.io.File

import org.apache.spark.annotation.Experimental

/**
* :: Experimental ::
* Typed Scala wrapper around the protobuf [[UDFWorkerSpecification]].
* A [[WorkerConnection]] over a Unix domain socket. Owns the socket
* path and removes the socket file on [[close]]. Subclasses provide the
* protocol-specific channel (e.g. gRPC over UDS) and may override
* [[close]] to add transport-level shutdown -- they should call
* `super.close()` to ensure the socket file is removed.
*
* [[close]] is idempotent: deleting an already-removed file is a no-op.
*/
@Experimental
class WorkerSpecification(val proto: UDFWorkerSpecification) {
abstract class UnixSocketWorkerConnection(val socketPath: String)
extends WorkerConnection {

override def close(): Unit = {
val f = new File(socketPath)
if (f.exists()) f.delete()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.udf.worker.core

import org.apache.spark.annotation.Experimental

/**
* :: Experimental ::
* A transport-level connection to a running UDF worker process.
*
* A [[WorkerConnection]] represents the communication channel between the
* Spark engine and a single worker process (e.g., a gRPC channel over a
* Unix domain socket, or a raw TCP socket). It is owned by a worker
* process wrapper (e.g., [[direct.DirectWorkerProcess]]) and shared
* across all [[WorkerSession]]s that use that process.
*
* One connection, many sessions: the worker exposes a single server-side
* endpoint that all sessions share. For gRPC, per-session work lives on
* multiplexed streams over this channel.
*
* Implementations expose only lifecycle. Data transmission happens at
* the [[WorkerSession]] level -- this class is solely about whether the
* channel is open.
*
* '''Relationship to other classes (direct creation mode):'''
* {{{
* DirectWorkerProcess 1 --- 1 WorkerConnection (transport over UDS)
Comment thread
haiyangsun-db marked this conversation as resolved.
* DirectWorkerProcess 1 --- * WorkerSession (UDF executions)
* }}}
*/
@Experimental
abstract class WorkerConnection extends AutoCloseable {
/** Returns true if the underlying transport channel is still usable. */
def isActive: Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package org.apache.spark.udf.worker.core

import org.apache.spark.annotation.Experimental
import org.apache.spark.udf.worker.WorkerSpecification
import org.apache.spark.udf.worker.UDFWorkerSpecification

/**
* :: Experimental ::
* Manages workers for a single [[WorkerSpecification]] and hides worker details from Spark.
* Manages workers for a single [[UDFWorkerSpecification]] and hides worker details from Spark.
*
* A [[WorkerDispatcher]] is created from a worker specification (plus context such
* as security scope). It owns the underlying worker processes and connections,
Expand All @@ -31,7 +31,7 @@ import org.apache.spark.udf.worker.WorkerSpecification
@Experimental
trait WorkerDispatcher extends AutoCloseable {

def workerSpec: WorkerSpecification
def workerSpec: UDFWorkerSpecification

/**
* Creates a [[WorkerSession]] that maps to one single UDF execution.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.udf.worker.core

import org.apache.spark.annotation.Experimental

/**
* :: Experimental ::
* Minimal logging surface used by the udf/worker framework.
*
* The framework deliberately does not depend on SLF4J (or any other
* concrete logging backend) so callers can embed it without dragging a
* specific logger onto the classpath. Embedders should supply an
* adapter that forwards to their preferred backend (Spark's `Logging`
* trait, SLF4J, java.util.logging, etc.).
*
* Only the methods actually used by the framework are exposed.
* Messages are passed by-name so the formatting cost is avoided when
* the backend decides to drop the event.
*/
@Experimental
trait WorkerLogger {
def warn(msg: => String): Unit
def warn(msg: => String, t: Throwable): Unit
def debug(msg: => String): Unit
def debug(msg: => String, t: Throwable): Unit
}

object WorkerLogger {
/** Discards all messages. Default for callers that don't wire up logging. */
val NoOp: WorkerLogger = new WorkerLogger {
override def warn(msg: => String): Unit = ()
override def warn(msg: => String, t: Throwable): Unit = ()
override def debug(msg: => String): Unit = ()
override def debug(msg: => String, t: Throwable): Unit = ()
}
}
Loading