Skip to content

Commit a94cdda

Browse files
wangzhigang1999pan3793
authored andcommitted
[KYUUBI #7379][1/4] Data Agent Engine: module skeleton, configuration, and engine core
### Why are the changes needed? Part 1 of 4 for the Data Agent Engine ([umbrella](#7379), [KPIP-7373](#7373)). This PR adds a new `DATA_AGENT` engine type with the module skeleton fully runnable via Echo provider. It establishes the foundation for subsequent PRs (tool system, agent runtime, REST API, and Web UI). Changes include: - Module `pom.xml`, `build/dist` packaging - All `kyuubi.engine.data.agent.*` configuration entries - `DATA_AGENT` engine type, `EngineRef` branch, `DataAgentProcessBuilder` with API key redaction - Engine core: `DataAgentEngine`, `BackendService`, `TBinaryFrontendService`, `SessionManager/Impl`, `OperationManager` - `DataAgentOperation`, `ExecuteStatement` — async agent execution with event-to-JSON conversion - `IncrementalFetchIterator` — thread-safe producer/consumer streaming with compaction - Event system: `AgentEvent`, `EventType`, all lifecycle/content/tool events - `EchoProvider` — test provider that echoes input with proper event sequence ### How was this patch tested? - `IncrementalFetchIteratorSuite` — basic ops, position tracking, concurrent producer/consumer safety - `IncrementalFetchIteratorCompactionSuite` — compaction trigger, position correctness, fetchAbsolute after compaction - `DataAgentOperationSuite` — JDBC round-trip with Echo provider, multiple queries in same session - `EchoProviderTest` — event sequence validation and content echo - `EventTest` — immutability, truncation, null handling, SSE name uniqueness - `DataAgentProcessBuilderSuite` — main class, API key redaction, memory, classpath, java options ### Was this patch authored or co-authored using generative AI tooling? Partially assisted by Claude Code (Claude Opus 4.6) for test deduplication, code style fixes, and PR splitting. Core design and implementation are human-authored. Closes #7385 from wangzhigang1999/pr1/data-agent-skeleton. Closes #7379 396566c [wangzhigang] Clarify DATA_AGENT subdomain isolation rationale in EngineRef 67ca188 [wangzhigang] Fix SSE event serialization: serialize toolArgs as JSON and add isError to TOOL_RESULT db763c0 [wangzhigang] Fix CI: add sqlite-jdbc test dep and remove MockLlmProvider suite b05c88b [wangzhigang] [KYUUBI #7379][PR 1/4] Data Agent Engine: module skeleton, configuration, and engine core Lead-authored-by: wangzhigang <wzg443064@alibaba-inc.com> Co-authored-by: wangzhigang <iamzhigangwang@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent bb67bd6 commit a94cdda

45 files changed

Lines changed: 3251 additions & 2 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

build/dist

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,7 @@ mkdir -p "$DISTDIR/externals/engines/trino"
288288
mkdir -p "$DISTDIR/externals/engines/hive"
289289
mkdir -p "$DISTDIR/externals/engines/jdbc"
290290
mkdir -p "$DISTDIR/externals/engines/chat"
291+
mkdir -p "$DISTDIR/externals/engines/data-agent"
291292
echo "Kyuubi $VERSION $GITREVSTRING built for" > "$DISTDIR/RELEASE"
292293
echo "Java $JAVA_VERSION" >> "$DISTDIR/RELEASE"
293294
echo "Scala $SCALA_VERSION" >> "$DISTDIR/RELEASE"
@@ -363,6 +364,18 @@ for jar in $(ls "$DISTDIR/jars/"); do
363364
fi
364365
done
365366

367+
# Copy data-agent engines
368+
cp "$KYUUBI_HOME/externals/kyuubi-data-agent-engine/target/kyuubi-data-agent-engine_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/externals/engines/data-agent/"
369+
cp -r "$KYUUBI_HOME"/externals/kyuubi-data-agent-engine/target/scala-$SCALA_VERSION/jars/*.jar "$DISTDIR/externals/engines/data-agent/"
370+
371+
# Share the jars w/ server to reduce binary size
372+
# shellcheck disable=SC2045
373+
for jar in $(ls "$DISTDIR/jars/"); do
374+
if [[ -f "$DISTDIR/externals/engines/data-agent/$jar" ]]; then
375+
(cd $DISTDIR/externals/engines/data-agent; ln -snf "../../../jars/$jar" "$DISTDIR/externals/engines/data-agent/$jar")
376+
fi
377+
done
378+
366379
# Copy Kyuubi Spark extension
367380
# shellcheck disable=SC2068
368381
for SPARK_EXTENSION_VERSION in ${SPARK_EXTENSION_VERSIONS[@]}; do

docs/configuration/settings.md

Lines changed: 12 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
19+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
<parent>
22+
<groupId>org.apache.kyuubi</groupId>
23+
<artifactId>kyuubi-parent</artifactId>
24+
<version>1.12.0-SNAPSHOT</version>
25+
<relativePath>../../pom.xml</relativePath>
26+
</parent>
27+
28+
<artifactId>kyuubi-data-agent-engine_${scala.binary.version}</artifactId>
29+
<packaging>jar</packaging>
30+
<name>Kyuubi Project Engine Data Agent</name>
31+
<url>https://kyuubi.apache.org/</url>
32+
33+
<dependencies>
34+
<!-- kyuubi dependency -->
35+
<dependency>
36+
<groupId>org.apache.kyuubi</groupId>
37+
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
38+
<version>${project.version}</version>
39+
</dependency>
40+
41+
<dependency>
42+
<groupId>org.apache.kyuubi</groupId>
43+
<artifactId>kyuubi-ha_${scala.binary.version}</artifactId>
44+
<version>${project.version}</version>
45+
</dependency>
46+
47+
<dependency>
48+
<groupId>org.apache.kyuubi</groupId>
49+
<artifactId>${hive.jdbc.artifact}</artifactId>
50+
<version>${project.version}</version>
51+
</dependency>
52+
53+
<!-- test dependencies -->
54+
<dependency>
55+
<groupId>org.apache.kyuubi</groupId>
56+
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
57+
<version>${project.version}</version>
58+
<type>test-jar</type>
59+
<scope>test</scope>
60+
</dependency>
61+
62+
<dependency>
63+
<groupId>org.xerial</groupId>
64+
<artifactId>sqlite-jdbc</artifactId>
65+
<scope>test</scope>
66+
</dependency>
67+
68+
<dependency>
69+
<groupId>junit</groupId>
70+
<artifactId>junit</artifactId>
71+
<scope>test</scope>
72+
</dependency>
73+
</dependencies>
74+
75+
<build>
76+
<plugins>
77+
<plugin>
78+
<groupId>org.apache.maven.plugins</groupId>
79+
<artifactId>maven-surefire-plugin</artifactId>
80+
<configuration>
81+
<skipTests>${skipTests}</skipTests>
82+
<argLine>${extraJavaTestArgs}</argLine>
83+
</configuration>
84+
</plugin>
85+
<plugin>
86+
<groupId>org.apache.maven.plugins</groupId>
87+
<artifactId>maven-jar-plugin</artifactId>
88+
<executions>
89+
<execution>
90+
<id>prepare-test-jar</id>
91+
<goals>
92+
<goal>test-jar</goal>
93+
</goals>
94+
<phase>test-compile</phase>
95+
</execution>
96+
</executions>
97+
</plugin>
98+
</plugins>
99+
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
100+
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
101+
</build>
102+
103+
</project>
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.dataagent.provider;
19+
20+
import java.util.function.Consumer;
21+
import org.apache.kyuubi.config.KyuubiConf;
22+
import org.apache.kyuubi.engine.dataagent.runtime.event.AgentEvent;
23+
import org.apache.kyuubi.util.reflect.DynConstructors;
24+
25+
/**
26+
* A pluggable provider interface for the Data Agent engine. Implementations wire up the ReactAgent
27+
* with an LLM model, tools, and middlewares.
28+
*/
29+
public interface DataAgentProvider {
30+
31+
/** Initialize a session for the given user. */
32+
void open(String sessionId, String user);
33+
34+
/**
35+
* Run the agent for the given request, emitting events via the consumer. Events include
36+
* token-level ContentDelta for streaming, ToolCall/ToolResult for tool invocations, and
37+
* AgentFinish when complete.
38+
*
39+
* @param sessionId the session identifier (maps to conversation memory)
40+
* @param request user-facing parameters (question, model override, etc.)
41+
* @param onEvent event consumer for streaming events
42+
*/
43+
void run(String sessionId, ProviderRunRequest request, Consumer<AgentEvent> onEvent);
44+
45+
/**
46+
* Close and clean up a single session, releasing session-scoped resources such as conversation
47+
* history and session state. Called when one user session ends.
48+
*/
49+
void close(String sessionId);
50+
51+
/**
52+
* Resolve a pending tool approval request. Called when the client sends an approval or denial
53+
* response for a tool call that requires human-in-the-loop confirmation.
54+
*
55+
* @param requestId the request ID from the ApprovalRequest event
56+
* @param approved true to approve, false to deny
57+
* @return true if the request was found and resolved, false if not found (timed out or invalid)
58+
*/
59+
default boolean resolveApproval(String requestId, boolean approved) {
60+
return false;
61+
}
62+
63+
/**
64+
* Stop the provider itself, releasing engine-level resources shared across all sessions (e.g.
65+
* HTTP connection pools, thread pools). Called once when the entire engine shuts down.
66+
*/
67+
default void stop() {}
68+
69+
static DataAgentProvider load(KyuubiConf conf) {
70+
String providerClass = conf.get(KyuubiConf.ENGINE_DATA_AGENT_PROVIDER());
71+
try {
72+
return (DataAgentProvider)
73+
DynConstructors.builder(DataAgentProvider.class)
74+
.impl(providerClass, KyuubiConf.class)
75+
.impl(providerClass)
76+
.buildChecked()
77+
.newInstanceChecked(conf);
78+
} catch (ClassCastException e) {
79+
throw new IllegalArgumentException(
80+
"Class "
81+
+ providerClass
82+
+ " is not a child of '"
83+
+ DataAgentProvider.class.getName()
84+
+ "'.",
85+
e);
86+
} catch (Exception e) {
87+
throw new IllegalArgumentException("Error while instantiating '" + providerClass + "': ", e);
88+
}
89+
}
90+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.dataagent.provider;
19+
20+
/**
21+
* User-facing request parameters for a provider-level agent invocation. Only contains fields from
22+
* the caller (question, model override, etc.). Adding new per-request options does not require
23+
* changing the {@link DataAgentProvider} interface.
24+
*/
25+
public class ProviderRunRequest {
26+
27+
private final String question;
28+
private String modelName;
29+
private String approvalMode;
30+
31+
public ProviderRunRequest(String question) {
32+
this.question = question;
33+
}
34+
35+
public String getQuestion() {
36+
return question;
37+
}
38+
39+
public String getModelName() {
40+
return modelName;
41+
}
42+
43+
public ProviderRunRequest modelName(String modelName) {
44+
this.modelName = modelName;
45+
return this;
46+
}
47+
48+
public String getApprovalMode() {
49+
return approvalMode;
50+
}
51+
52+
public ProviderRunRequest approvalMode(String approvalMode) {
53+
this.approvalMode = approvalMode;
54+
return this;
55+
}
56+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.dataagent.provider.echo;
19+
20+
import java.util.function.Consumer;
21+
import org.apache.kyuubi.engine.dataagent.provider.DataAgentProvider;
22+
import org.apache.kyuubi.engine.dataagent.provider.ProviderRunRequest;
23+
import org.apache.kyuubi.engine.dataagent.runtime.event.AgentEvent;
24+
import org.apache.kyuubi.engine.dataagent.runtime.event.AgentFinish;
25+
import org.apache.kyuubi.engine.dataagent.runtime.event.AgentStart;
26+
import org.apache.kyuubi.engine.dataagent.runtime.event.ContentComplete;
27+
import org.apache.kyuubi.engine.dataagent.runtime.event.ContentDelta;
28+
import org.apache.kyuubi.engine.dataagent.runtime.event.StepEnd;
29+
import org.apache.kyuubi.engine.dataagent.runtime.event.StepStart;
30+
31+
/** A simple echo provider for testing purposes. Simulates the agent event stream. */
32+
public class EchoProvider implements DataAgentProvider {
33+
34+
@Override
35+
public void open(String sessionId, String user) {}
36+
37+
@Override
38+
public void run(String sessionId, ProviderRunRequest request, Consumer<AgentEvent> onEvent) {
39+
String question = request.getQuestion();
40+
41+
onEvent.accept(new AgentStart());
42+
onEvent.accept(new StepStart(1));
43+
44+
// Simulate token-level streaming
45+
String reply =
46+
"[DataAgent Echo] I received your question: "
47+
+ question
48+
+ "\n"
49+
+ "This is the Data Agent engine in echo mode. "
50+
+ "Please configure an LLM provider (e.g., OPENAI_COMPATIBLE) for actual data analysis.";
51+
for (String token : reply.split("(?<=\\s)")) {
52+
onEvent.accept(new ContentDelta(token));
53+
}
54+
55+
onEvent.accept(new ContentComplete(reply));
56+
onEvent.accept(new StepEnd(1));
57+
onEvent.accept(new AgentFinish(1, 0, 0, 0));
58+
}
59+
60+
@Override
61+
public void close(String sessionId) {}
62+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.dataagent.runtime.event;
19+
20+
/** An error occurred during agent execution. */
21+
public final class AgentError extends AgentEvent {
22+
private final String message;
23+
24+
public AgentError(String message) {
25+
super(EventType.ERROR);
26+
this.message = message;
27+
}
28+
29+
public String message() {
30+
return message;
31+
}
32+
33+
@Override
34+
public String toString() {
35+
return "AgentError{message='" + message + "'}";
36+
}
37+
}

0 commit comments

Comments
 (0)