[SPARK-56412] Implement DirectWorkerDispatcher that spawns UDF workers as local callable.#55406
[SPARK-56412] Implement DirectWorkerDispatcher that spawns UDF workers as local callable.#55406haiyangsun-db wants to merge 9 commits intoapache:masterfrom
Conversation
…ure. use a map to manage workers
dtenedor
left a comment
There was a problem hiding this comment.
Thanks for working on this!
| while (!file.exists() && attempts < maxAttempts) { | ||
| if (!process.isAlive) { | ||
| val tail = readOutputTail(outputFile) | ||
| throw new RuntimeException( |
There was a problem hiding this comment.
can we please make an error class for this?
There was a problem hiding this comment.
added a DirectWorkerException - intentionally avoid making it a Spark exception to reduce dependencies to Spark in this module - the caller (Spark) should wrap the exception into Spark exceptions.
| // deliberately not registered: it is redundant with the explicit cleanup, | ||
| // it leaks memory in long-lived JVMs (the JDK retains the path string for | ||
| // the process lifetime), and it only works on empty directories. | ||
| private val socketDir = Files.createTempDirectory("spark-udf-worker") |
There was a problem hiding this comment.
Files.createTempDirectory("spark-udf-worker") on POSIX systems creates 0700 directories by default, but it relies on the JDK to honor PosixFilePermissions. On filesystems that don't support POSIX (e.g., some mounted tmpfs configs or containerized setups with unusual umasks), the mode falls back to the default, which may be group-readable. For UDS sockets this allows other local users to attempt to connect. Worth either:
Explicitly passing FileAttribute<Set> with rwx------, or
At minimum, asserting the returned directory has 0700 and failing loudly otherwise.
Given this is UDF RPC, local-user isolation is a meaningful security property.
| repeated WorkerConnection connections = 3; | ||
| } | ||
|
|
||
| message WorkerConnection { |
There was a problem hiding this comment.
There is org.apache.spark.udf.worker.WorkerConnection (proto-generated) and org.apache.spark.udf.worker.core.WorkerConnection (the Scala abstract class). The test file even has to alias them (WorkerConnection => WorkerConnectionProto). Every downstream implementer will hit this. Either rename one (e.g., WorkerTransport for the core abstraction, or WorkerConnectionSpec for the proto message) or add a prominent note to the README's "Basic usage" snippet pointing out the import-aliasing pattern.
| * - A '''[[WorkerConnection]]''' (the transport channel to that process). | ||
| * - A '''socket path''' (a UDS socket file) that both sides use. | ||
| * | ||
| * Multiple [[DirectWorkerSession]]s may share the same process when the |
There was a problem hiding this comment.
DirectWorkerProcess Scaladoc oversells concurrency:
Multiple [[DirectWorkerSession]]s may share the same process when the worker supports concurrent UDFs.
Today nothing shares — every createSession spawns a fresh process. The ref-count infrastructure is in place but unused. The doc should either say "will support sharing once pooling lands" or "the dispatcher currently creates one process per session; the ref-count is scaffolding for future pooling." As written, a new reader will look for the sharing code and not find it.
There was a problem hiding this comment.
updated comments, but we will very likely idle pooling in follow ups.
| // (Required) Exactly one entry today; the field is repeated to allow | ||
| // additional connections (e.g., data + control) to be added without a | ||
| // schema-breaking migration. | ||
| repeated WorkerConnection connections = 3; |
There was a problem hiding this comment.
worker_spec.proto line around the --connection doc: engine-assinged → engine-assigned.
| * Tests for [[DirectWorkerDispatcher]] process lifecycle: spawning workers | ||
| * and terminating them on close. | ||
| */ | ||
| class DirectWorkerDispatcherSuite |
There was a problem hiding this comment.
Test coverage concerns:
The Current suite is good for happy paths and most error paths, but these gaps stand out.
- No timeout tests
Three timeout behaviors are implemented, none covered:
initTimeoutMs — worker never creates the socket. Covered by the "early-exit" test only for the process-dies variant, not the process-hangs-without-binding variant.
callableTimeoutMs — installation/verification/cleanup runs past the limit. The PR has no test that exercises destroyForcibly + "timed out after…" error.
gracefulTimeoutMs — worker ignores SIGTERM (a nontrivial scenario; a bash trap '' SIGTERM would do it). The escalation to SIGKILL is a critical correctness property and is untested.
Each of these can be simulated with a short bash -c 'trap "" SIGTERM; sleep 999' or sleep 999 script and an overridden callableTimeoutMs/gracefulTimeoutMs.
-
No test for concurrent ensureEnvironmentReady
"Environment setup runs only once across multiple sessions" runs createSession sequentially. The lock claim — that two concurrent initial createSessions don't double-install — is not verified. A barrier-synchronized pair of threads, with an installation script that appends to a file, would close this. -
No test for the shutdown-hook path
registerEnvironmentCleanupHook is exercised only indirectly. There is no test that the hook actually runs environmentCleanup when the JVM shuts down (which is admittedly hard to unit-test), nor that close() deregisters the hook (a forked JVM that exits after close() and checks via a side-channel file would work). -
cancel() is not tested at all
WorkerSession.cancel has a thread-safety contract ("must work from a different thread than process"). The stub in the suite implements it as a no-op, and no test verifies any implementation. Fine for a scaffolding PR, but add a TODO in the test suite so the next PR that adds a real session type doesn't forget. -
No assertion that the socket directory is cleaned up
close() deletes socketDir. The suite never checks that socketDir no longer exists after dispatcher.close(). Easy to add — store the path via a testing hook or from new File(session.workerProcess.socketPath).getParentFile and assert !exists() post-close. -
No negative test for dispatcher reuse after close
Calling dispatcher.createSession(None) after dispatcher.close() should probably throw. Today it most likely proceeds and creates an orphaned worker (see concern Removed reference to incubation in README.md. #1). Adding a test that it fails would both lock in the fix and document the behavior. -
concurrent createSession calls produce distinct workers uses assert(sessions.size == threads)
Good. But it doesn't verify that each worker's socket path is unique. workerObjects.distinct only checks object identity; if two threads somehow shared state, a simple sanity check on map(_.socketPath).distinct.size == threads strengthens the assertion.
There was a problem hiding this comment.
tests added (which also explains now the big PR size).
There was a problem hiding this comment.
Thanks!
Anoter one: no test for close() concurrent with an in-flight createSession: You test createSession after close, and concurrent createSession against each other, but the "caller is in the middle of spawnWorker when another thread calls close()" race - which the acquire-before-publish + re-check pattern is specifically designed for - isn't exercised. A barrier-synchronized test (one thread blocks inside an overridden createSessionForWorker, another thread calls close(), assert the first thread gets either a session-that-is-promptly-torn-down or an IllegalStateException) would lock in the invariant.
There was a problem hiding this comment.
Nice piece of work -- the abstractions (dispatcher / session / connection / logger), the env-lifecycle state machine, and the test suite all look thoughtful. Most of what follows is either narrow race windows or small papercuts worth considering; none of it is blocking.
Correctness / robustness
-
Zombie children on failed spawn.
cleanupFailedSpawn(DirectWorkerDispatcher.scala:783) and theif (process.isAlive) process.destroyForcibly()branch insidewaitForSocket(~line 834) calldestroyForcibly()but neverprocess.waitFor(...). On Linux the child becomes a zombie until the JVM reaps it, which in long-lived drivers can stack up. A short boundedwaitFor(gracefulTimeoutMs, ...)after the kill would match whatDirectWorkerProcess.close()already does. -
JVM shutdown hook not registered on install failure.
ensureEnvironmentReadyonly callsregisterEnvironmentCleanupHook()on the success path. If installation partially succeeds then fails (e.g., half-copied files), and the process is killed before anyone callsdispatcher.close(), the cleanup callable never runs.runEnvironmentCleanup()already handles theFailedstate via thecase _branch, so you could register the hook unconditionally the moment installation starts -- or at minimum document this as intentional.
Overall LGTM modulo the items above -- happy to re-review once you decide which (if any) to address.
Concurrency / correctness:
- Introduce AtomicBoolean `closed` on DirectWorkerDispatcher; reject
createSession after close; re-check after workers.put so any worker
spawned concurrently with close() tears itself down via the
ref-count callback instead of leaking. close() itself is idempotent
via CAS.
- Acquire the session ref-count BEFORE publishing to `workers`; a
concurrent close() iterating the map can no longer tear down a
worker whose caller is about to increment the count.
- Introduce DirectWorkerDispatcher.destroyForciblyAndReap: SIGKILL
plus a bounded waitFor so the kernel actually reaps the child.
Used from cleanupFailedSpawn, waitForSocket, runCallable timeout,
and both kill paths in DirectWorkerProcess.close(). Separate
SIGKILL_REAP_TIMEOUT_MS (5s, distinct from gracefulTimeoutMs); logs
a warning with a context tag if the reap window expires.
- waitForSocket terminal branch reports the worker's exit code when
it exited cleanly before creating the socket, instead of the
ambiguous "did not create within Nms" message.
- Mark `closed`, `workers`, `cleanupHook` as `private[this]` so the
lifecycle state cannot be touched from other instances.
Error model:
- Introduce DirectWorkerException (extends RuntimeException) in the
direct package. Replace all runtime-failure RuntimeException throws
in DirectWorkerDispatcher so callers can catch the specific type
instead of every RuntimeException. IllegalArgumentException and
IllegalStateException continue to signal programming errors.
Environment lifecycle:
- Register the cleanup hook up front in the Pending branch whenever
environment_cleanup is configured, independent of verify/install.
Cleanup is user-defined and may tear down worker state, temp
files, and other runtime artifacts beyond install output, so it
should honor the user's configuration regardless of whether setup
ran. Early registration also covers partial-install failure.
- assert(Thread.holdsLock(environmentLock)) in
registerEnvironmentCleanupHook so the docstring contract fails
loud if a future caller forgets the lock.
- TODO noting the per-dispatcher shutdown-hook retention cost in
long-running drivers, pointing at a shared coordinator as the
follow-up.
WorkerSession contract:
- Make init and process final on WorkerSession. Guard both with
AtomicBoolean and delegate to abstract doInit / doProcess.
Subclasses get "exactly once" / "at most once" for free and
cannot bypass the contract.
- Rewrite the InitMessage docstring: drop the PythonInputPartition /
UpdateDelegationTokens cross-references and mark it as a
placeholder that will be replaced by a proto message once the UDF
protocol lands.
Security:
- Create the socket directory with POSIX 0700 via
PosixFilePermissions.asFileAttribute so UDS sockets inside are
not reachable by other local users. Non-POSIX filesystems fall
back to File.setXxx; that path WARNs if any setXxx returns false
so operators see the degraded mode instead of silently shipping a
world-accessible directory.
Polish:
- Rename proto message WorkerConnection -> WorkerConnectionSpec to
disambiguate from the core Scala WorkerConnection abstraction.
Drop the WorkerConnectionProto import alias in the test suite
and update the README usage snippet.
- Fix typo in worker_spec.proto: engine-assinged -> engine-assigned.
- DirectWorkerProcess Scaladoc: say one process per session today;
the ref-count is scaffolding for future pooling, not live sharing.
- readOutputTail uses FileChannel.position for an O(1) unambiguous
seek instead of the FileInputStream.skip loop.
- Extract throwClosed() and throwWorkerExitedBeforeSocket() helpers
to deduplicate error-message construction.
- Catch IOException on the outputFile delete in cleanupFailedSpawn
so a cleanup failure does not mask the original spawn error.
- destroyForciblyAndReap early-returns on InterruptedException to
avoid a spurious "still alive after SIGKILL" warning when the
full reap window was never actually waited.
- Scaladoc on close() noting it does not drain in-flight
createSession calls; brace the CAS early-return for readability.
Tests:
- createSession after close is rejected.
- Socket directory is owner-only (0700) on POSIX; use `assume`
instead of a silent skip on non-POSIX.
- Socket directory is removed after dispatcher.close.
- SIGKILL escalation: worker traps SIGTERM; assert the process is
reaped after close() and that close waited at least
gracefulTimeoutMs. Uses a 500ms graceful window to keep the test
bounded.
- initTimeoutMs: worker stays alive but never binds the socket;
assert the "did not create socket" error and reap.
- callableTimeoutMs: install sleeps past the timeout; assert
"Callable timed out" and reap.
- Concurrent createSession still installs exactly once -- races
many threads through ensureEnvironmentReady.
- Strengthen "distinct workers" test with a socketPath uniqueness
check (object identity alone is insufficient).
- TODO on StubWorkerSession noting cancel() needs real coverage
once a concrete session impl lands.
fcee940 to
8733a59
Compare
@Yicong-Huang thank you for the review - both suggestions are fixed. |
sven-weber-db
left a comment
There was a problem hiding this comment.
Overall, looks good to me. Left some comments below.
On a high level, I think there is some code duplication that we could reduce. For example, the environment installation callables and the callable for the worker process are identical. However, there are two different code paths handling them.
Additionally, the cleanup of worker processes is highly distributed and has different responsibilities depending on the worker process's current state. E.g., sometimes the DirectWorkerDispatcher is responsible for cleanup - other times it's the DirectWorkerProcess itself. Potentially, this could be unified to a single, central place. This would make the implementation much cleaner, simpler, and easier to follow.
…or time out, 3. better cleanup logic.
dtenedor
left a comment
There was a problem hiding this comment.
Thanks again for working on this. One more test idea, and then this looks ready to go, I believe.
| // No-op when environment_cleanup is not configured. | ||
| registerEnvironmentCleanupHook() | ||
| val verified = env.hasEnvironmentVerification && | ||
| runCallable(env.getEnvironmentVerification).exitCode == 0 |
There was a problem hiding this comment.
Do we know that all of these non-zero error codes are non-retriable? Currently if we have a timeout, the state remains Pending and we retry.
There was a problem hiding this comment.
fixed - explicitly try catch block and move the state in case of failure.
|
|
||
| /** Registers the JVM shutdown hook that runs the cleanup callable. */ | ||
| private def registerEnvironmentCleanupHook(): Unit = { | ||
| assert(Thread.holdsLock(environmentLock), |
There was a problem hiding this comment.
This assert is disabled when -Xdisable-assertions is set. Should we use if (...) { throw new SparkException.internalError instead?
| * Tests for [[DirectWorkerDispatcher]] process lifecycle: spawning workers | ||
| * and terminating them on close. | ||
| */ | ||
| class DirectWorkerDispatcherSuite |
There was a problem hiding this comment.
Thanks!
Anoter one: no test for close() concurrent with an in-flight createSession: You test createSession after close, and concurrent createSession against each other, but the "caller is in the middle of spawnWorker when another thread calls close()" race - which the acquire-before-publish + re-check pattern is specifically designed for - isn't exercised. A barrier-synchronized test (one thread blocks inside an overridden createSessionForWorker, another thread calls close(), assert the first thread gets either a session-that-is-promptly-torn-down or an IllegalStateException) would lock in the invariant.
…ase direct dispatcher, also improved cleanup logic.
dtenedor
left a comment
There was a problem hiding this comment.
Reviewed again, everything LGTM this round.
|
LGTM, merging to master |
What changes were proposed in this pull request?
Adds the engine-side foundation for the language-agnostic UDF framework introduced by SPARK-55278. Lives under
udf/worker/corewith no dependencybeyond
spark-tagsand the existingudf/worker/proto.Public API in
org.apache.spark.udf.worker.core:WorkerDispatcher— owns workers for oneUDFWorkerSpecification; hands callersWorkerSessions.WorkerSession— one UDF execution.init(exactly once) andprocess(at most once, after init) arefinalwith AtomicBoolean guards; subclasses implementdoInit/doProcess.InitMessageisa placeholder until the UDF wire protocol lands.
WorkerConnection— transport channel; one server-side endpoint shared by all sessions.UnixSocketWorkerConnection— UDS subclass; owns the socket path and removes the file on close.WorkerSecurityScope,WorkerLogger— pool-partitioning identity and minimal logging surface (no SLF4J dep).Direct-creation impl in
.direct:DirectWorkerDispatcher(abstract, transport-agnostic) — process spawn, environment lifecycle (installation/environment_verification/environment_cleanup), worker registry, race-safe close.Five protected hooks for transport plug-in:
newEndpointAddress,waitForReady,cleanupEndpointAddress,closeTransport,validateTransportSupport.DirectUnixSocketWorkerDispatcher(abstract) — UDS implementation of the hooks (private 0700 socket dir, file-existence polling, file deletion).DirectWorkerProcess+WorkerArtifacts(process + connection + output log) — close sends SIGTERM, waitsgracefulTimeoutMs, then artifacts handle connection close + SIGKILL+reap + file cleanup.DirectWorkerException/DirectWorkerTimeoutException— runtime-failure types; the timeout subclass lets callers discriminate retry policy.Hardening:
close(); acquire-before-publish + post-publishclosedre-check makescreateSessionracing withcloseeither throw or get a session whose worker is reaped.Failedpermanently.Proto:
UDFWorkerProperties.connectionis singular (one server-side endpoint shared by sessions).WorkerConnectionSpecproto message renamed to disambiguate from the core ScalaWorkerConnection.Notably out of scope (TODOs in code): connection pooling, security-scope partitioning, indirect creation (daemon), TCP transport (a sibling subclass implementing the same five hooks),
retriable-vs-permanent failure classification, shared shutdown hook across dispatchers.
Why are the changes needed?
This is the Spark-independent foundation the rest of the language-agnostic UDF work will build on. The dispatcher is already factored to support transports beyond UDS.
Does this PR introduce any user-facing change?
No. The module is not wired into any planner/executor path; classes are
@Experimental.How was this patch tested?
DirectWorkerDispatcherSuite(31 tests) covering: worker spawn / SIGTERM / SIGKILL escalation; concurrent createSession (distinct processes and socket paths); the close-vs-in-flight-createSession race;createSession-after-close rejection; socket-dir 0700 + cleanup; init / callable / graceful timeout caps and behaviour; environment lifecycle (skip-install-on-verify, install-once-under-concurrency,
cached failure, install-timeout permanence, cleanup on close); spec validation (missing/non-UDS connection, verify-without-install, securityScope).
build/mvn test -pl udf/worker/core -am
Was this patch authored or co-authored using generative AI tooling?
Yes.