[KYUUBI #7379][1/4] Data Agent Engine: module skeleton, configuration, and engine core#7385
[KYUUBI #7379][1/4] Data Agent Engine: module skeleton, configuration, and engine core#7385wangzhigang1999 wants to merge 4 commits intoapache:masterfrom
Conversation
…iguration, and engine core
- Add sqlite-jdbc test dependency to data-agent-engine pom.xml - Remove DataAgentMockLlmSuite which depends on MockLlmProvider from a later PR
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Introduces the initial, runnable skeleton for the new DATA_AGENT engine type, including configuration entries, process launching, engine/session/operation plumbing, and an Echo provider for end-to-end validation.
Changes:
- Adds
DATA_AGENTas a newEngineTypeand wiresEngineRef+ a dedicatedDataAgentProcessBuilder(with API key redaction). - Creates the
externals/kyuubi-data-agent-enginemodule with core engine services, session manager/impl, operations, streaming iterator, and event model. - Updates distribution packaging and documents the new configuration settings; adds unit/integration tests for streaming and provider behavior.
Reviewed changes
Copilot reviewed 45 out of 45 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| pom.xml | Registers the new externals/kyuubi-data-agent-engine Maven module. |
| kyuubi-server/src/test/scala/org/apache/kyuubi/engine/dataagent/DataAgentProcessBuilderSuite.scala | Adds tests for Data Agent process command rendering and redaction. |
| kyuubi-server/src/main/scala/org/apache/kyuubi/engine/dataagent/DataAgentProcessBuilder.scala | Implements engine process builder, classpath logic, and API key redaction in toString. |
| kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala | Wires DATA_AGENT engine creation + datasource-based subdomain isolation and ZK fallback JDBC URL. |
| kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineType.scala | Adds DATA_AGENT engine type constant. |
| kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala | Introduces kyuubi.engine.data.agent.* and kyuubi.frontend.data.agent.* configuration entries. |
| externals/kyuubi-data-agent-engine/pom.xml | Defines the new Data Agent engine module build and test dependencies. |
| externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/DataAgentEngine.scala | Adds Data Agent engine entrypoint and lifecycle wiring. |
| externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/DataAgentBackendService.scala | Adds backend service with Data Agent session manager. |
| externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/DataAgentTBinaryFrontendService.scala | Adds Thrift binary frontend for the Data Agent engine. |
| externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/session/DataAgentSessionManager.scala | Adds session manager and provider lifecycle management. |
| externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/session/DataAgentSessionImpl.scala | Implements session open/close delegating to provider. |
| externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/DataAgentOperation.scala | Implements streaming-friendly fetch behavior and result schema. |
| externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/DataAgentOperationManager.scala | Routes statements to approval or execution operations. |
| externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/ExecuteStatement.scala | Runs provider asynchronously and converts events into streaming JSON rows. |
| externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/ApproveToolCall.scala | Adds synchronous approval/deny command operation returning a JSON status row. |
| externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/IncrementalFetchIterator.scala | Adds thread-safe incremental fetch iterator with compaction. |
| externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/schema/SchemaHelper.scala | Provides minimal Thrift schema helpers for string results. |
| externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/schema/DataAgentTRowSetGenerator.scala | Adds TRowSetGenerator implementation for string columns. |
| externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/provider/DataAgentProvider.java | Defines provider SPI and dynamic loading from config. |
| externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/provider/ProviderRunRequest.java | Adds per-request parameters (question/model/approval mode). |
| externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/provider/echo/EchoProvider.java | Implements Echo provider emitting a realistic event stream for tests. |
| externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/*.java | Adds event types and event classes used for streaming and tool lifecycle. |
| externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/tool/ToolRiskLevel.java | Adds risk-level enum for approval policy. |
| externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/WithDataAgentEngine.scala | Adds shared engine start/stop helper for Data Agent tests. |
| externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/operation/IncrementalFetchIteratorSuite.scala | Adds concurrency and positioning tests for iterator. |
| externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/operation/IncrementalFetchIteratorCompactionSuite.scala | Adds compaction correctness tests. |
| externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/operation/DataAgentOperationSuite.scala | Adds JDBC round-trip tests through Echo provider. |
| externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/operation/DataAgentE2ESuite.scala | Adds opt-in E2E test using real LLM provider + SQLite. |
| externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/operation/ApprovalWorkflowIntegrationSuite.scala | Adds integration tests for approve/deny routing and behavior. |
| externals/kyuubi-data-agent-engine/src/test/java/org/apache/kyuubi/engine/dataagent/provider/echo/EchoProviderTest.java | Verifies Echo provider event sequence and content reconstruction. |
| externals/kyuubi-data-agent-engine/src/test/java/org/apache/kyuubi/engine/dataagent/runtime/event/EventTest.java | Validates event immutability, truncation, and SSE name uniqueness. |
| docs/configuration/settings.md | Documents new Data Agent configuration settings. |
| build/dist | Packages the new Data Agent engine jars into the binary distribution. |
Comments suppressed due to low confidence (2)
kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala:1
"%02x".format(_)on a signedBytecan produce 8-hex-digit chunks (due to sign extension), makinghexlonger and unstable (e.g., "ffffffff" instead of "ff"). Mask each byte with0xffbefore formatting (e.g., formatb & 0xff) to ensure fixed 2-hex-digit output per byte.
externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/WithDataAgentEngine.scala:1- The test helper mutates global JVM
Systemproperties and a sharedDataAgentEngine.kyuubiConfbut does not restore previous values, which can leak configuration across test suites and cause order-dependent failures. Capture prior values and restore/clear them inafterAll(or isolate by constructing a freshKyuubiConfper suite and avoiding globalSystem.setPropertywhere possible).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/DataAgentOperation.scala
Show resolved
Hide resolved
...nt-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/ExecuteStatement.scala
Show resolved
Hide resolved
…or to TOOL_RESULT
|
Hi @pan3793, gentle ping for review when you have a moment. This is the first PR (1/4) of the Data Agent Engine series (#7379). It covers the module skeleton, configuration entries, engine process builder, and an Echo provider for end-to-end validation. CI is all green and Copilot's review comments have been addressed. Happy to adjust anything based on your feedback. Thanks! |
| buffer | ||
| } | ||
|
|
||
| override def toString: String = { |
There was a problem hiding this comment.
could you please paste a demo command for the data agent engine launch?
There was a problem hiding this comment.
Here's the real launch command from a local run (API key redacted by redactConfValues):
2026-04-10 14:58:04.968 INFO KyuubiSessionManager-exec-pool: Thread-86 org.apache.kyuubi.engine.EngineRef: Launching engine:
/Users/zhigang/Library/Java/JavaVirtualMachines/azul-11.0.23/Contents/Home/bin/java \
-Xmx1g \
-Dfile.encoding=UTF-8 \
-Dkyuubi.frontend.thrift.binary.bind.host=127.0.0.1 \
-cp /Users/zhigang/IdeaProjects/kyuubi/externals/kyuubi-data-agent-engine/target/kyuubi-data-agent-engine_2.12-1.12.0-SNAPSHOT.jar:/Users/zhigang/IdeaProjects/kyuubi/externals/kyuubi-data-agent-engine/target/scala-2.12/jars/* \
org.apache.kyuubi.engine.dataagent.DataAgentEngine \
--conf kyuubi.session.user=anonymous \
--conf kyuubi.engine.id=d8d3181a-ba44-4335-a788-f475330caf21 \
--conf kyuubi.client.ipAddress=127.0.0.1 \
--conf kyuubi.engine.appMgrInfo=eyJyZXNvdXJjZU1hbmFnZXIiOm51bGwsImt1YmVybmV0ZXNJbmZvIjp7ImNvbnRleHQiOm51bGwsIm5hbWVzcGFjZSI6bnVsbH19 \
--conf kyuubi.engine.data.agent.java.options=-Dkyuubi.frontend.thrift.binary.bind.host=127.0.0.1 \
--conf kyuubi.engine.data.agent.jdbc.url=jdbc:sqlite:/Users/zhigang/PycharmProjects/data-insight/benchmarks/bird/minidev/MINIDEV/dev_databases/california_schools/california_schools.sqlite \
--conf kyuubi.engine.data.agent.llm.api.key=*********(redacted) \
--conf kyuubi.engine.data.agent.llm.api.url=https://dashscope.aliyuncs.com/compatible-mode/v1 \
--conf kyuubi.engine.data.agent.llm.model=qwen3.5-plus-2026-02-15 \
--conf kyuubi.engine.data.agent.max.iterations=100 \
--conf kyuubi.engine.data.agent.provider=openai_compatible \
--conf kyuubi.engine.submit.time=1775804284939 \
--conf kyuubi.engine.type=DATA_AGENT \
--conf kyuubi.ha.addresses=30.246.181.34:2181 \
--conf kyuubi.ha.engine.ref.id=d8d3181a-ba44-4335-a788-f475330caf21 \
--conf kyuubi.ha.namespace=/kyuubi_1.12.0-SNAPSHOT_USER_DATA_AGENT/anonymous/ds-2f23fd339a886fe9 \
--conf kyuubi.ha.zookeeper.auth.type=NONE \
--conf kyuubi.server.ipAddress=0.0.0.0 \
--conf kyuubi.session.connection.url=0.0.0.0:10099 \
--conf kyuubi.session.real.user=anonymous
There was a problem hiding this comment.
BTW, it seems to have a bug in command redaction in the current versions of Kyuubi, reported by #7387
|
|
||
| @VisibleForTesting | ||
| private[kyuubi] val subdomain: String = conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN) match { | ||
| // DATA_AGENT engines must be isolated by datasource (JDBC URL). This takes precedence over |
There was a problem hiding this comment.
Thanks for pushing on this — "must" really was under-motivated, let me walk through the thinking.
Starting from what's actually in this PR, the provider SPI is single-datasource by contract:
DataAgentProvider.load(KyuubiConf)is called once per engine from engine-level conf.ENGINE_DATA_AGENT_JDBC_URLis a singleOptionalConfigEntry[String].ProviderRunRequestcarriesquestion/modelName/approvalMode— no datasource field.
So a provider instance is bound to exactly one JDBC URL when it's loaded, and a caller has no way to dispatch a request against a different URL. The EngineRef.subdomain branch is just the routing side of that same contract, keeping the two halves in sync.
As for why the SPI is shaped that way: I tried the multi-datasource version in a previous project and ran into real pain — SQL dialect divergence (tools built around one dialect don't transfer cleanly to another), connection-pool and credential isolation, schema introspection cost, and the general awkwardness of one process quietly talking to several production databases. The takeaway for me was that binding a process to a single datasource makes tool design, prompt construction, and credential handling a lot simpler, so I carried that lesson into the SPI here.
Multi-datasource isn't ruled out forever — it would just mean rethinking the SPI (per-request datasource, or a provider keyed by datasource), the config surface, and the routing together, which feels like its own discussion. Happy to revisit if a concrete use case comes up.
On the comment itself, I'll tighten it to say the why directly:
// A DATA_AGENT engine is bound 1:1 to a JDBC datasource because the provider SPI
// is constructed once per engine from a single kyuubi.engine.data.agent.jdbc.url,
// and ProviderRunRequest carries no datasource field — there is no way to dispatch
// a request against a different JDBC URL than the one the provider was loaded
// with. Sessions targeting different datasources must therefore route to distinct
// engines.WDYT?
There was a problem hiding this comment.
let's update it as your proposed comment for now, and then I will merge this PR to unblock your subsequent PRs, we can revise this later if we find any issues.
|
|
||
| def isApprovalCommand(statement: String): Boolean = { | ||
| val trimmed = statement.trim | ||
| trimmed.startsWith(APPROVE_PREFIX) || trimmed.startsWith(DENY_PREFIX) |
There was a problem hiding this comment.
can you share a little bit more about the design details here? why isApproval* returns true for DENY_PREFIX?
There was a problem hiding this comment.
Sure. The goal is to let a human approve or deny risky tool calls (for example a destructive SQL statement) without adding any new RPC — everything stays on the normal executeStatement / fetchResults path, so plain JDBC and Thrift clients work as-is. Here's the full round trip:
sequenceDiagram
autonumber
participant C as Client
participant OM as DataAgentOperationManager
participant ES as ExecuteStatement<br/>(async)
participant P as Provider<br/>(agent + approval)
participant AT as ApproveToolCall<br/>(sync)
C->>OM: executeStatement("what is ...")
OM->>ES: new ExecuteStatement
ES->>P: run(request, onEvent)
P-->>ES: content_delta events
ES-->>C: fetchResults (JSON rows)
Note over P: agent hits a risky tool call<br/>put(requestId, future)
P-->>ES: approval_request(requestId)
ES-->>C: fetchResults (approval row)
Note over P: agent thread blocks on<br/>future.get(timeout)
C->>OM: executeStatement("__approve:<requestId>")
OM->>AT: isApprovalCommand ⇒ route to ApproveToolCall
AT->>P: resolveApproval(requestId, true)
P-->>P: future.complete(true)
AT-->>C: {status:"ok", action, requestId}
Note over P: agent thread resumes
P-->>ES: tool_result / content_delta / finished
ES-->>C: fetchResults (remaining rows)
A couple of choices worth calling out:
ApproveToolCall is synchronous (shouldRunAsync = false) because it's just a control-plane ping — there's nothing to stream, so async would only add an extra state transition for no benefit. Staying on the executeStatement path means we don't touch the Thrift IDL and every existing client keeps working. The __approve: / __deny: prefixes are an easy sentinel since natural-language questions don't start with __.
The pending-request map lives inside the provider, not in the operation manager, because the agent runtime is what actually blocks on the tool call — the future belongs next to the thing that's waiting on it. The operation layer just forwards resolveApproval and doesn't need to know about tool-call semantics or timeouts, which keeps the DataAgentOperation* classes free of agent-specific state.
Scope for this PR: what lands here is the SPI (DataAgentProvider.resolveApproval), the routing, and ApproveToolCall itself. The actual parking mechanism inside a real provider comes with the LLM-backed provider in the follow-up PR. Right now the default resolveApproval returns false, so an unknown or expired requestId surfaces as {"status":"not_found"} to the client.
There was a problem hiding this comment.
can you share a little bit more about the design details here? why
isApproval*returns true forDENY_PREFIX?
isApprovalCommand is a routing predicate — "is this statement part of the approval control channel?", not "did the user approve?". __approve: and __deny: are two outcomes on the same channel, so both route to ApproveToolCall, which then decodes the prefix into a boolean and passes it to resolveApproval(requestId, approved) to unblock the parked agent thread.
| package org.apache.kyuubi.engine.dataagent.runtime.event; | ||
|
|
||
| /** | ||
| * Enumerates the types of events emitted by the ReAct agent loop. Each value maps to a |
There was a problem hiding this comment.
can you add a reference link to the ReAct agent loop?
There was a problem hiding this comment.
Good catch — the concrete ReactAgent class that actually drives these events lives in the LLM-backed provider, which lands in the follow-up PR (2/4 of the series). I'll add the {@link ReactAgent} / @see from EventType in that PR, so the enum and the loop implementation live next to each other and stay in sync. Keeping this PR's Javadoc untouched for now to avoid a dangling reference.
|
@wangzhigang1999 good job! thank you for kicking off the data agent engine development work! |
Explain why a DATA_AGENT engine is bound 1:1 to a JDBC datasource: the provider SPI is constructed once per engine from a single jdbc.url and ProviderRunRequest carries no datasource field, so sessions targeting different datasources must route to distinct engines. Addresses review comment from @pan3793 on apache#7385.
|
thanks, merged to master |
Why are the changes needed?
Part 1 of 4 for the Data Agent Engine (umbrella, KPIP-7373).
This PR adds a new
DATA_AGENTengine type with the module skeleton fully runnable via Echo provider. It establishes the foundation for subsequent PRs (tool system, agent runtime, REST API, and Web UI).Changes include:
pom.xml,build/distpackagingkyuubi.engine.data.agent.*configuration entriesDATA_AGENTengine type,EngineRefbranch,DataAgentProcessBuilderwith API key redactionDataAgentEngine,BackendService,TBinaryFrontendService,SessionManager/Impl,OperationManagerDataAgentOperation,ExecuteStatement— async agent execution with event-to-JSON conversionIncrementalFetchIterator— thread-safe producer/consumer streaming with compactionAgentEvent,EventType, all lifecycle/content/tool eventsEchoProvider— test provider that echoes input with proper event sequenceHow was this patch tested?
IncrementalFetchIteratorSuite— basic ops, position tracking, concurrent producer/consumer safetyIncrementalFetchIteratorCompactionSuite— compaction trigger, position correctness, fetchAbsolute after compactionDataAgentOperationSuite— JDBC round-trip with Echo provider, multiple queries in same sessionEchoProviderTest— event sequence validation and content echoEventTest— immutability, truncation, null handling, SSE name uniquenessDataAgentProcessBuilderSuite— main class, API key redaction, memory, classpath, java optionsWas this patch authored or co-authored using generative AI tooling?
Partially assisted by Claude Code (Claude Opus 4.6) for test deduplication, code style fixes, and PR splitting. Core design and implementation are human-authored.