onEvent) {
+ String question = request.getQuestion();
+
+ onEvent.accept(new AgentStart());
+ onEvent.accept(new StepStart(1));
+
+ // Simulate token-level streaming
+ String reply =
+ "[DataAgent Echo] I received your question: "
+ + question
+ + "\n"
+ + "This is the Data Agent engine in echo mode. "
+ + "Please configure an LLM provider (e.g., OPENAI_COMPATIBLE) for actual data analysis.";
+ for (String token : reply.split("(?<=\\s)")) {
+ onEvent.accept(new ContentDelta(token));
+ }
+
+ onEvent.accept(new ContentComplete(reply));
+ onEvent.accept(new StepEnd(1));
+ onEvent.accept(new AgentFinish(1, 0, 0, 0));
+ }
+
+ @Override
+ public void close(String sessionId) {}
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/AgentError.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/AgentError.java
new file mode 100644
index 00000000000..38c3ea8b5d6
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/AgentError.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.runtime.event;
+
+/** An error occurred during agent execution. */
+public final class AgentError extends AgentEvent {
+ private final String message;
+
+ public AgentError(String message) {
+ super(EventType.ERROR);
+ this.message = message;
+ }
+
+ public String message() {
+ return message;
+ }
+
+ @Override
+ public String toString() {
+ return "AgentError{message='" + message + "'}";
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/AgentEvent.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/AgentEvent.java
new file mode 100644
index 00000000000..4bc0fc88ccf
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/AgentEvent.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.runtime.event;
+
+/**
+ * Base class for events emitted by the ReAct agent loop. Each event represents a discrete step in
+ * the agent's reasoning and execution process, enabling real-time token-level streaming to clients.
+ *
+ * Every subclass must declare its {@link EventType}, which also determines the SSE event name
+ * used on the wire. This allows consumers to {@code switch} on the type rather than using {@code
+ * instanceof} chains.
+ *
+ *
Package-private constructor restricts subclassing to this package.
+ */
+public abstract class AgentEvent {
+
+ private final EventType eventType;
+
+ AgentEvent(EventType eventType) {
+ this.eventType = eventType;
+ }
+
+ /** Returns the type of this event. */
+ public EventType eventType() {
+ return eventType;
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/AgentFinish.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/AgentFinish.java
new file mode 100644
index 00000000000..d9f962bbf0a
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/AgentFinish.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.runtime.event;
+
+/** The agent has finished its analysis. */
+public final class AgentFinish extends AgentEvent {
+ private final int totalSteps;
+ private final long promptTokens;
+ private final long completionTokens;
+ private final long totalTokens;
+
+ public AgentFinish(int totalSteps, long promptTokens, long completionTokens, long totalTokens) {
+ super(EventType.AGENT_FINISH);
+ this.totalSteps = totalSteps;
+ this.promptTokens = promptTokens;
+ this.completionTokens = completionTokens;
+ this.totalTokens = totalTokens;
+ }
+
+ public int totalSteps() {
+ return totalSteps;
+ }
+
+ public long promptTokens() {
+ return promptTokens;
+ }
+
+ public long completionTokens() {
+ return completionTokens;
+ }
+
+ public long totalTokens() {
+ return totalTokens;
+ }
+
+ @Override
+ public String toString() {
+ return "AgentFinish{totalSteps="
+ + totalSteps
+ + ", promptTokens="
+ + promptTokens
+ + ", completionTokens="
+ + completionTokens
+ + ", totalTokens="
+ + totalTokens
+ + "}";
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/AgentStart.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/AgentStart.java
new file mode 100644
index 00000000000..f7cc16ae9b0
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/AgentStart.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.runtime.event;
+
+/** The agent has started processing a user query. */
+public final class AgentStart extends AgentEvent {
+
+ public AgentStart() {
+ super(EventType.AGENT_START);
+ }
+
+ @Override
+ public String toString() {
+ return "AgentStart{}";
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/ApprovalRequest.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/ApprovalRequest.java
new file mode 100644
index 00000000000..81d01c72fd2
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/ApprovalRequest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.runtime.event;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.kyuubi.engine.dataagent.tool.ToolRiskLevel;
+
+/**
+ * Emitted when the agent requires user approval before executing a tool. The client should present
+ * this to the user and respond with an approval or denial via the approval channel.
+ */
+public final class ApprovalRequest extends AgentEvent {
+
+ private final String requestId;
+ private final String toolCallId;
+ private final String toolName;
+ private final Map toolArgs;
+ private final ToolRiskLevel riskLevel;
+
+ public ApprovalRequest(
+ String requestId,
+ String toolCallId,
+ String toolName,
+ Map toolArgs,
+ ToolRiskLevel riskLevel) {
+ super(EventType.APPROVAL_REQUEST);
+ this.requestId = requestId;
+ this.toolCallId = toolCallId;
+ this.toolName = toolName;
+ this.toolArgs =
+ toolArgs != null
+ ? Collections.unmodifiableMap(new LinkedHashMap<>(toolArgs))
+ : Collections.emptyMap();
+ this.riskLevel = riskLevel;
+ }
+
+ public String requestId() {
+ return requestId;
+ }
+
+ public String toolCallId() {
+ return toolCallId;
+ }
+
+ public String toolName() {
+ return toolName;
+ }
+
+ public Map toolArgs() {
+ return toolArgs;
+ }
+
+ public ToolRiskLevel riskLevel() {
+ return riskLevel;
+ }
+
+ @Override
+ public String toString() {
+ return "ApprovalRequest{requestId='"
+ + requestId
+ + "', toolName='"
+ + toolName
+ + "', riskLevel="
+ + riskLevel
+ + "}";
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/ContentComplete.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/ContentComplete.java
new file mode 100644
index 00000000000..c1b60f88401
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/ContentComplete.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.runtime.event;
+
+/** The complete LLM output for one reasoning step. */
+public final class ContentComplete extends AgentEvent {
+ private final String fullText;
+
+ public ContentComplete(String fullText) {
+ super(EventType.CONTENT_COMPLETE);
+ this.fullText = fullText;
+ }
+
+ public String fullText() {
+ return fullText;
+ }
+
+ @Override
+ public String toString() {
+ return "ContentComplete{length=" + (fullText != null ? fullText.length() : 0) + "}";
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/ContentDelta.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/ContentDelta.java
new file mode 100644
index 00000000000..9b921864011
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/ContentDelta.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.runtime.event;
+
+/** A single token or chunk from the LLM streaming response. */
+public final class ContentDelta extends AgentEvent {
+ private final String text;
+
+ public ContentDelta(String text) {
+ super(EventType.CONTENT_DELTA);
+ this.text = text;
+ }
+
+ public String text() {
+ return text;
+ }
+
+ @Override
+ public String toString() {
+ String preview = text != null && text.length() > 200 ? text.substring(0, 200) + "..." : text;
+ return "ContentDelta{text='" + preview + "'}";
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/EventType.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/EventType.java
new file mode 100644
index 00000000000..937422e2bf5
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/EventType.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.runtime.event;
+
+/**
+ * Enumerates the types of events emitted by the ReAct agent loop. Each value maps to a
+ * corresponding {@link AgentEvent} subclass and carries an SSE event name used for wire
+ * serialization.
+ */
+public enum EventType {
+
+ /** The agent has started processing a user query. */
+ AGENT_START("agent_start"),
+
+ /** A new ReAct iteration is starting. */
+ STEP_START("step_start"),
+
+ /** A single token or chunk from the LLM streaming response. */
+ CONTENT_DELTA("content_delta"),
+
+ /** The complete LLM output for one reasoning step. */
+ CONTENT_COMPLETE("content_complete"),
+
+ /** The agent is about to invoke a tool. */
+ TOOL_CALL("tool_call"),
+
+ /** The result of a tool invocation. */
+ TOOL_RESULT("tool_result"),
+
+ /** A ReAct iteration has completed. */
+ STEP_END("step_end"),
+
+ /** An error occurred during agent execution. */
+ ERROR("error"),
+
+ /** The agent requires user approval before executing a tool. */
+ APPROVAL_REQUEST("approval_request"),
+
+ /** The agent has finished its analysis. */
+ AGENT_FINISH("agent_finish");
+
+ private final String sseEventName;
+
+ EventType(String sseEventName) {
+ this.sseEventName = sseEventName;
+ }
+
+ /** Returns the SSE event name used in the {@code event:} field of the SSE protocol. */
+ public String sseEventName() {
+ return sseEventName;
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/StepEnd.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/StepEnd.java
new file mode 100644
index 00000000000..128c90ec7d4
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/StepEnd.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.runtime.event;
+
+/** A ReAct iteration has completed. */
+public final class StepEnd extends AgentEvent {
+ private final int stepNumber;
+
+ public StepEnd(int stepNumber) {
+ super(EventType.STEP_END);
+ this.stepNumber = stepNumber;
+ }
+
+ public int stepNumber() {
+ return stepNumber;
+ }
+
+ @Override
+ public String toString() {
+ return "StepEnd{stepNumber=" + stepNumber + "}";
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/StepStart.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/StepStart.java
new file mode 100644
index 00000000000..6794ec839a2
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/StepStart.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.runtime.event;
+
+/** A new ReAct iteration is starting. */
+public final class StepStart extends AgentEvent {
+ private final int stepNumber;
+
+ public StepStart(int stepNumber) {
+ super(EventType.STEP_START);
+ this.stepNumber = stepNumber;
+ }
+
+ public int stepNumber() {
+ return stepNumber;
+ }
+
+ @Override
+ public String toString() {
+ return "StepStart{stepNumber=" + stepNumber + "}";
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/ToolCall.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/ToolCall.java
new file mode 100644
index 00000000000..7d32114d383
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/ToolCall.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.runtime.event;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** The agent is about to invoke a tool. */
+public final class ToolCall extends AgentEvent {
+ private final String toolCallId;
+ private final String toolName;
+ private final Map toolArgs;
+
+ public ToolCall(String toolCallId, String toolName, Map toolArgs) {
+ super(EventType.TOOL_CALL);
+ this.toolCallId = toolCallId;
+ this.toolName = toolName;
+ this.toolArgs =
+ toolArgs != null
+ ? Collections.unmodifiableMap(new LinkedHashMap<>(toolArgs))
+ : Collections.emptyMap();
+ }
+
+ public String toolCallId() {
+ return toolCallId;
+ }
+
+ public String toolName() {
+ return toolName;
+ }
+
+ public Map toolArgs() {
+ return toolArgs;
+ }
+
+ @Override
+ public String toString() {
+ return "ToolCall{id='"
+ + toolCallId
+ + "', toolName='"
+ + toolName
+ + "', toolArgs="
+ + toolArgs
+ + "}";
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/ToolResult.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/ToolResult.java
new file mode 100644
index 00000000000..5e910aae751
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/event/ToolResult.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.runtime.event;
+
+/** The result of a tool invocation. */
+public final class ToolResult extends AgentEvent {
+ private final String toolCallId;
+ private final String toolName;
+ private final String output;
+ private final boolean isError;
+
+ public ToolResult(String toolCallId, String toolName, String output, boolean isError) {
+ super(EventType.TOOL_RESULT);
+ this.toolCallId = toolCallId;
+ this.toolName = toolName;
+ this.output = output;
+ this.isError = isError;
+ }
+
+ public String toolCallId() {
+ return toolCallId;
+ }
+
+ public String toolName() {
+ return toolName;
+ }
+
+ public String output() {
+ return output;
+ }
+
+ public boolean isError() {
+ return isError;
+ }
+
+ @Override
+ public String toString() {
+ return "ToolResult{id='"
+ + toolCallId
+ + "', toolName='"
+ + toolName
+ + "', isError="
+ + isError
+ + ", output='"
+ + (output != null && output.length() > 200 ? output.substring(0, 200) + "..." : output)
+ + "'}";
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/tool/ToolRiskLevel.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/tool/ToolRiskLevel.java
new file mode 100644
index 00000000000..f4d1c541641
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/tool/ToolRiskLevel.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.tool;
+
+/** Risk level of an agent tool, used to determine whether user approval is required. */
+public enum ToolRiskLevel {
+ /** Read-only operations that do not modify data (e.g. SELECT, DESCRIBE, schema inspection). */
+ SAFE,
+ /** Operations that can modify data or schema (e.g. INSERT, UPDATE, DELETE, DROP). */
+ DESTRUCTIVE
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/DataAgentBackendService.scala b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/DataAgentBackendService.scala
new file mode 100644
index 00000000000..9f161e590c8
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/DataAgentBackendService.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.dataagent
+
+import org.apache.kyuubi.engine.dataagent.session.DataAgentSessionManager
+import org.apache.kyuubi.service.AbstractBackendService
+import org.apache.kyuubi.session.SessionManager
+
+class DataAgentBackendService
+ extends AbstractBackendService("DataAgentBackendService") {
+
+ override val sessionManager: SessionManager = new DataAgentSessionManager()
+
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/DataAgentEngine.scala b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/DataAgentEngine.scala
new file mode 100644
index 00000000000..e064122868d
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/DataAgentEngine.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.dataagent
+
+import org.apache.kyuubi.{Logging, Utils}
+import org.apache.kyuubi.Utils.{addShutdownHook, JDBC_ENGINE_SHUTDOWN_PRIORITY}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.dataagent.DataAgentEngine.currentEngine
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_RETRY_POLICY
+import org.apache.kyuubi.ha.client.RetryPolicies
+import org.apache.kyuubi.service.Serverable
+import org.apache.kyuubi.util.SignalRegister
+
+class DataAgentEngine extends Serverable("DataAgentEngine") {
+
+ override val backendService = new DataAgentBackendService()
+ override val frontendServices = Seq(new DataAgentTBinaryFrontendService(this))
+
+ override def start(): Unit = {
+ super.start()
+ backendService.sessionManager.startTerminatingChecker(() => {
+ selfExited = true
+ currentEngine.foreach(_.stop())
+ })
+ }
+
+ override protected def stopServer(): Unit = {}
+}
+
+object DataAgentEngine extends Logging {
+
+ val kyuubiConf: KyuubiConf = KyuubiConf()
+
+ @volatile var currentEngine: Option[DataAgentEngine] = None
+
+ def startEngine(): Unit = {
+ currentEngine = Some(new DataAgentEngine())
+ currentEngine.foreach { engine =>
+ engine.initialize(kyuubiConf)
+ engine.start()
+ addShutdownHook(
+ () => {
+ engine.stop()
+ },
+ JDBC_ENGINE_SHUTDOWN_PRIORITY + 1)
+ }
+ }
+
+ def main(args: Array[String]): Unit = {
+ SignalRegister.registerLogger(logger)
+
+ try {
+ Utils.fromCommandLineArgs(args, kyuubiConf)
+ kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+ kyuubiConf.setIfMissing(HA_ZK_CONN_RETRY_POLICY, RetryPolicies.N_TIME.toString)
+
+ startEngine()
+ } catch {
+ case t: Throwable if currentEngine.isDefined =>
+ currentEngine.foreach { engine =>
+ engine.stop()
+ }
+ error("Failed to create Data Agent Engine", t)
+ throw t
+ case t: Throwable =>
+ error("Failed to create Data Agent Engine.", t)
+ throw t
+ }
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/DataAgentTBinaryFrontendService.scala b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/DataAgentTBinaryFrontendService.scala
new file mode 100644
index 00000000000..12abf86943f
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/DataAgentTBinaryFrontendService.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.dataagent
+
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_ID
+import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, ServiceDiscovery}
+import org.apache.kyuubi.service.{Serverable, Service, TBinaryFrontendService}
+
+class DataAgentTBinaryFrontendService(override val serverable: Serverable)
+ extends TBinaryFrontendService("DataAgentTBinaryFrontend") {
+
+ override lazy val discoveryService: Option[Service] =
+ if (ServiceDiscovery.supportServiceDiscovery(conf)) {
+ Some(new EngineServiceDiscovery(this))
+ } else {
+ None
+ }
+
+ override def attributes: Map[String, String] = {
+ super.attributes ++ conf.getAll
+ .get(KYUUBI_ENGINE_ID).map(id => Map(KYUUBI_ENGINE_ID -> id)).getOrElse(Map.empty)
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/ApproveToolCall.scala b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/ApproveToolCall.scala
new file mode 100644
index 00000000000..bc37910a0cf
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/ApproveToolCall.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.dataagent.operation
+
+import com.fasterxml.jackson.databind.ObjectMapper
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.engine.dataagent.provider.DataAgentProvider
+import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationState}
+import org.apache.kyuubi.session.Session
+
+/**
+ * A lightweight synchronous operation that resolves a pending tool approval request.
+ *
+ * The client sends a statement with the format `__approve:` or `__deny:`.
+ * This operation parses the command, calls the provider's `resolveApproval`, and returns
+ * a single-row result indicating whether the resolution succeeded.
+ */
+class ApproveToolCall(
+ session: Session,
+ override val statement: String,
+ dataAgentProvider: DataAgentProvider)
+ extends DataAgentOperation(session) with Logging {
+
+ override val shouldRunAsync: Boolean = false
+
+ override protected def runInternal(): Unit = {
+ setState(OperationState.RUNNING)
+
+ try {
+ val trimmed = statement.trim
+ val (requestId, approved) = if (trimmed.startsWith(ApproveToolCall.APPROVE_PREFIX)) {
+ (trimmed.substring(ApproveToolCall.APPROVE_PREFIX.length).trim, true)
+ } else if (trimmed.startsWith(ApproveToolCall.DENY_PREFIX)) {
+ (trimmed.substring(ApproveToolCall.DENY_PREFIX.length).trim, false)
+ } else {
+ throw new IllegalArgumentException(s"Invalid approval command: $trimmed")
+ }
+ if (requestId.isEmpty) {
+ throw new IllegalArgumentException("requestId cannot be empty")
+ }
+
+ val resolved = dataAgentProvider.resolveApproval(requestId, approved)
+ val action = if (approved) "approved" else "denied"
+ val node = ApproveToolCall.JSON.createObjectNode()
+ node.put("status", if (resolved) "ok" else "not_found")
+ node.put("action", action)
+ node.put("requestId", requestId)
+ val result = ApproveToolCall.JSON.writeValueAsString(node)
+
+ iter = new ArrayFetchIterator[Array[String]](Array(Array(result)))
+ setState(OperationState.FINISHED)
+ } catch {
+ onError()
+ }
+ }
+}
+
+object ApproveToolCall {
+ private val JSON = new ObjectMapper()
+ val APPROVE_PREFIX = "__approve:"
+ val DENY_PREFIX = "__deny:"
+
+ def isApprovalCommand(statement: String): Boolean = {
+ val trimmed = statement.trim
+ trimmed.startsWith(APPROVE_PREFIX) || trimmed.startsWith(DENY_PREFIX)
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/DataAgentOperation.scala b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/DataAgentOperation.scala
new file mode 100644
index 00000000000..9fc56653bae
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/DataAgentOperation.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.dataagent.operation
+
+import org.apache.kyuubi.{KyuubiSQLException, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.dataagent.schema.{DataAgentTRowSetGenerator, SchemaHelper}
+import org.apache.kyuubi.engine.dataagent.schema.DataAgentTRowSetGenerator.COL_STRING_TYPE
+import org.apache.kyuubi.operation.{AbstractOperation, FetchIterator, OperationState}
+import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, FETCH_PRIOR, FetchOrientation}
+import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
+
+abstract class DataAgentOperation(session: Session) extends AbstractOperation(session) {
+
+ @volatile protected var iter: FetchIterator[Array[String]] = _
+
+ protected lazy val conf: KyuubiConf = session.sessionManager.getConf
+
+ override def getNextRowSetInternal(
+ order: FetchOrientation,
+ rowSetSize: Int): TFetchResultsResp = {
+ validateDefaultFetchOrientation(order)
+ // Allow fetching during RUNNING state for streaming support
+ if (state != OperationState.FINISHED && state != OperationState.RUNNING) {
+ throw new IllegalStateException(
+ s"Expected state FINISHED or RUNNING, but found $state")
+ }
+ require(iter != null, s"Operation $statementId result iterator not initialized")
+ setHasResultSet(true)
+ order match {
+ case FETCH_NEXT =>
+ iter.fetchNext()
+ case FETCH_PRIOR =>
+ iter.fetchPrior(rowSetSize)
+ case FETCH_FIRST =>
+ iter.fetchAbsolute(0)
+ }
+
+ val taken = iter.take(rowSetSize).map(_.toSeq)
+ val resultRowSet = new DataAgentTRowSetGenerator().toTRowSet(
+ taken.toSeq,
+ Seq(COL_STRING_TYPE),
+ getProtocolVersion)
+ resultRowSet.setStartRowOffset(iter.getPosition)
+ val resp = new TFetchResultsResp(OK_STATUS)
+ resp.setResults(resultRowSet)
+ resp.setHasMoreRows(iter.hasNext || state == OperationState.RUNNING)
+ resp
+ }
+
+ override def cancel(): Unit = {
+ cleanup(OperationState.CANCELED)
+ }
+
+ override def close(): Unit = {
+ cleanup(OperationState.CLOSED)
+ }
+
+ protected def onError(): PartialFunction[Throwable, Unit] = {
+ case e: Throwable =>
+ withLockRequired {
+ val errMsg = Utils.stringifyException(e)
+ if (state == OperationState.TIMEOUT) {
+ val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
+ setOperationException(ke)
+ throw ke
+ } else if (isTerminalState(state)) {
+ setOperationException(KyuubiSQLException(errMsg))
+ warn(s"Ignore exception in terminal state with $statementId: $errMsg")
+ } else {
+ error(s"Error operating $opType: $errMsg", e)
+ val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e)
+ setOperationException(ke)
+ setState(OperationState.ERROR)
+ throw ke
+ }
+ }
+ }
+
+ override protected def beforeRun(): Unit = {
+ setState(OperationState.PENDING)
+ setHasResultSet(true)
+ }
+
+ override protected def afterRun(): Unit = {}
+
+ override def getResultSetMetadata: TGetResultSetMetadataResp = {
+ val tTableSchema = SchemaHelper.stringTTableSchema("reply")
+ val resp = new TGetResultSetMetadataResp
+ resp.setSchema(tTableSchema)
+ resp.setStatus(OK_STATUS)
+ resp
+ }
+
+ override def shouldRunAsync: Boolean = true
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/DataAgentOperationManager.scala b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/DataAgentOperationManager.scala
new file mode 100644
index 00000000000..59805954375
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/DataAgentOperationManager.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.dataagent.operation
+
+import java.util
+
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.dataagent.provider.DataAgentProvider
+import org.apache.kyuubi.operation.{Operation, OperationManager}
+import org.apache.kyuubi.session.Session
+
+class DataAgentOperationManager(
+ conf: KyuubiConf,
+ dataAgentProvider: DataAgentProvider) extends OperationManager("DataAgentOperationManager") {
+
+ override def newExecuteStatementOperation(
+ session: Session,
+ statement: String,
+ confOverlay: Map[String, String],
+ runAsync: Boolean,
+ queryTimeout: Long): Operation = {
+ val operation = if (ApproveToolCall.isApprovalCommand(statement)) {
+ new ApproveToolCall(session, statement, dataAgentProvider)
+ } else {
+ new ExecuteStatement(
+ session,
+ statement,
+ confOverlay,
+ runAsync,
+ queryTimeout,
+ dataAgentProvider)
+ }
+ addOperation(operation)
+ }
+
+ override def newGetTypeInfoOperation(session: Session): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
+
+ override def newGetCatalogsOperation(session: Session): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
+
+ override def newGetSchemasOperation(
+ session: Session,
+ catalog: String,
+ schema: String): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
+
+ override def newGetTablesOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ tableName: String,
+ tableTypes: util.List[String]): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
+
+ override def newGetTableTypesOperation(session: Session): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
+
+ override def newGetColumnsOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ tableName: String,
+ columnName: String): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
+
+ override def newGetFunctionsOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ functionName: String): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
+
+ override def newGetPrimaryKeysOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ tableName: String): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
+
+ override def newGetCrossReferenceOperation(
+ session: Session,
+ primaryCatalog: String,
+ primarySchema: String,
+ primaryTable: String,
+ foreignCatalog: String,
+ foreignSchema: String,
+ foreignTable: String): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
+
+ override def getQueryId(operation: Operation): String = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
+
+ override def newSetCurrentCatalogOperation(session: Session, catalog: String): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
+
+ override def newGetCurrentCatalogOperation(session: Session): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
+
+ override def newSetCurrentDatabaseOperation(session: Session, database: String): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
+
+ override def newGetCurrentDatabaseOperation(session: Session): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/ExecuteStatement.scala b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/ExecuteStatement.scala
new file mode 100644
index 00000000000..2ea1d8a41aa
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/ExecuteStatement.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.dataagent.operation
+
+import java.util.concurrent.RejectedExecutionException
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.databind.node.ObjectNode
+import org.slf4j.MDC
+
+import org.apache.kyuubi.{KyuubiSQLException, Logging}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.dataagent.provider.{DataAgentProvider, ProviderRunRequest}
+import org.apache.kyuubi.engine.dataagent.runtime.event.{AgentError, AgentEvent, AgentFinish, ApprovalRequest, ContentDelta, EventType, StepEnd, StepStart, ToolCall, ToolResult}
+import org.apache.kyuubi.operation.OperationState
+import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.session.Session
+
+class ExecuteStatement(
+ session: Session,
+ override val statement: String,
+ confOverlay: Map[String, String],
+ override val shouldRunAsync: Boolean,
+ queryTimeout: Long,
+ dataAgentProvider: DataAgentProvider)
+ extends DataAgentOperation(session) with Logging {
+
+ import ExecuteStatement.JSON
+
+ private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
+ override def getOperationLog: Option[OperationLog] = Option(operationLog)
+
+ private val incrementalIter = new IncrementalFetchIterator[Array[String]]()
+
+ override protected def runInternal(): Unit = {
+ addTimeoutMonitor(queryTimeout)
+ iter = incrementalIter
+
+ val asyncOperation = new Runnable {
+ override def run(): Unit = {
+ executeStatement()
+ }
+ }
+
+ try {
+ val sessionManager = session.sessionManager
+ val backgroundHandle = sessionManager.submitBackgroundOperation(asyncOperation)
+ setBackgroundHandle(backgroundHandle)
+ } catch {
+ case rejected: RejectedExecutionException =>
+ setState(OperationState.ERROR)
+ val ke =
+ KyuubiSQLException("Error submitting query in background, query rejected", rejected)
+ setOperationException(ke)
+ shutdownTimeoutMonitor()
+ throw ke
+ }
+ }
+
+ private def toJson(build: ObjectNode => Unit): String = {
+ val node = JSON.createObjectNode()
+ build(node)
+ JSON.writeValueAsString(node)
+ }
+
+ private def executeStatement(): Unit = {
+ setState(OperationState.RUNNING)
+
+ try {
+ val sessionId = session.handle.identifier.toString
+ val operationId = getHandle.identifier.toString
+ MDC.put("operationId", operationId)
+ MDC.put("sessionId", sessionId)
+ val request = new ProviderRunRequest(statement)
+ // Merge session-level conf with per-statement confOverlay (overlay takes precedence)
+ val mergedConf = session.conf ++ confOverlay
+ mergedConf.get(KyuubiConf.ENGINE_DATA_AGENT_LLM_MODEL.key).foreach(request.modelName)
+ val approvalMode = mergedConf.getOrElse(
+ KyuubiConf.ENGINE_DATA_AGENT_APPROVAL_MODE.key,
+ session.sessionManager.getConf.get(KyuubiConf.ENGINE_DATA_AGENT_APPROVAL_MODE))
+ request.approvalMode(approvalMode)
+
+ val eventConsumer: AgentEvent => Unit = { (event: AgentEvent) =>
+ val sseType = event.eventType().sseEventName()
+ event.eventType() match {
+ case EventType.AGENT_START =>
+ incrementalIter.append(Array(toJson { n =>
+ n.put("type", sseType)
+ }))
+ case EventType.STEP_START =>
+ val stepStart = event.asInstanceOf[StepStart]
+ incrementalIter.append(Array(toJson { n =>
+ n.put("type", sseType); n.put("step", stepStart.stepNumber())
+ }))
+ case EventType.CONTENT_DELTA =>
+ val delta = event.asInstanceOf[ContentDelta]
+ incrementalIter.append(Array(toJson { n =>
+ n.put("type", sseType); n.put("text", delta.text())
+ }))
+ case EventType.TOOL_CALL =>
+ val toolCall = event.asInstanceOf[ToolCall]
+ incrementalIter.append(Array(toJson { n =>
+ n.put("type", sseType)
+ n.put("id", toolCall.toolCallId())
+ n.put("name", toolCall.toolName())
+ n.set("args", JSON.valueToTree(toolCall.toolArgs()))
+ }))
+ case EventType.TOOL_RESULT =>
+ val toolResult = event.asInstanceOf[ToolResult]
+ incrementalIter.append(Array(toJson { n =>
+ n.put("type", sseType)
+ n.put("id", toolResult.toolCallId())
+ n.put("name", toolResult.toolName())
+ n.put("output", toolResult.output())
+ n.put("isError", toolResult.isError())
+ }))
+ case EventType.STEP_END =>
+ val stepEnd = event.asInstanceOf[StepEnd]
+ incrementalIter.append(Array(toJson { n =>
+ n.put("type", sseType); n.put("step", stepEnd.stepNumber())
+ }))
+ case EventType.ERROR =>
+ val err = event.asInstanceOf[AgentError]
+ incrementalIter.append(Array(toJson { n =>
+ n.put("type", sseType); n.put("message", err.message())
+ }))
+ case EventType.APPROVAL_REQUEST =>
+ val req = event.asInstanceOf[ApprovalRequest]
+ incrementalIter.append(Array(toJson { n =>
+ n.put("type", sseType)
+ n.put("requestId", req.requestId())
+ n.put("id", req.toolCallId())
+ n.put("name", req.toolName())
+ n.put("args", req.toolArgs().toString)
+ n.put("riskLevel", req.riskLevel().name())
+ }))
+ case EventType.AGENT_FINISH =>
+ val finish = event.asInstanceOf[AgentFinish]
+ incrementalIter.append(Array(toJson { n =>
+ n.put("type", sseType)
+ n.put("steps", finish.totalSteps())
+ }))
+ case _ => // CONTENT_COMPLETE — internal to middleware pipeline
+ }
+ }
+ dataAgentProvider.run(sessionId, request, e => eventConsumer(e))
+
+ setState(OperationState.FINISHED)
+ } catch {
+ onError()
+ } finally {
+ MDC.remove("operationId")
+ MDC.remove("sessionId")
+ shutdownTimeoutMonitor()
+ }
+ }
+}
+
+object ExecuteStatement {
+ private val JSON = new ObjectMapper()
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/IncrementalFetchIterator.scala b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/IncrementalFetchIterator.scala
new file mode 100644
index 00000000000..488977d07d4
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/IncrementalFetchIterator.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.dataagent.operation
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.kyuubi.operation.FetchIterator
+
+/**
+ * A thread-safe [[FetchIterator]] that supports concurrent appending and reading.
+ * The producer thread calls [[append]] to add items incrementally while the consumer
+ * thread fetches results via the standard FetchIterator interface.
+ *
+ * Uses `ArrayBuffer` with explicit synchronization instead of `CopyOnWriteArrayList`
+ * to avoid O(n) array copies on every append (which accumulates to O(n^2) for
+ * token-level streaming).
+ */
+class IncrementalFetchIterator[A] extends FetchIterator[A] {
+
+ private val buffer = new ArrayBuffer[A]()
+ private val lock = new AnyRef
+
+ // All positions are logical (absolute from the start of the stream).
+ // buffer(0) corresponds to logical index `trimmedCount`.
+ private var trimmedCount: Long = 0
+ @volatile private var fetchStart: Long = 0
+ @volatile private var position: Long = 0
+
+ private val COMPACT_THRESHOLD = 1024
+
+ /**
+ * Append an item to the buffer. Thread-safe - can be called from the producer thread
+ * while the consumer thread is reading.
+ */
+ def append(item: A): Unit = lock.synchronized {
+ buffer += item
+ }
+
+ override def fetchNext(): Unit = lock.synchronized {
+ fetchStart = position
+ compactIfNeeded()
+ }
+
+ override def fetchAbsolute(pos: Long): Unit = lock.synchronized {
+ val logicalSize = trimmedCount + buffer.size
+ position = (pos max trimmedCount) min logicalSize
+ fetchStart = position
+ compactIfNeeded()
+ }
+
+ override def getFetchStart: Long = lock.synchronized { fetchStart }
+
+ override def getPosition: Long = lock.synchronized { position }
+
+ override def hasNext: Boolean = lock.synchronized {
+ position < trimmedCount + buffer.size
+ }
+
+ override def next(): A = lock.synchronized {
+ if (!hasNext) throw new NoSuchElementException("No more elements")
+ val idx = (position - trimmedCount).toInt
+ position += 1
+ buffer(idx)
+ }
+
+ /**
+ * Remove already-consumed entries from the front of the buffer to free memory.
+ * Called during fetch operations when the consumed prefix exceeds the threshold.
+ */
+ private def compactIfNeeded(): Unit = lock.synchronized {
+ val consumable = (fetchStart - trimmedCount).toInt
+ if (consumable >= COMPACT_THRESHOLD) {
+ buffer.remove(0, consumable)
+ trimmedCount += consumable
+ }
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/schema/DataAgentTRowSetGenerator.scala b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/schema/DataAgentTRowSetGenerator.scala
new file mode 100644
index 00000000000..960e12541bf
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/schema/DataAgentTRowSetGenerator.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.schema
+
+import org.apache.kyuubi.engine.dataagent.schema.DataAgentTRowSetGenerator._
+import org.apache.kyuubi.engine.result.TRowSetGenerator
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
+
+class DataAgentTRowSetGenerator
+ extends TRowSetGenerator[Seq[String], Seq[String], String] {
+
+ override def getColumnSizeFromSchemaType(schema: Seq[String]): Int = schema.length
+
+ override def getColumnType(schema: Seq[String], ordinal: Int): String = COL_STRING_TYPE
+
+ override def isColumnNullAt(row: Seq[String], ordinal: Int): Boolean = row(ordinal) == null
+
+ override def getColumnAs[T](row: Seq[String], ordinal: Int): T = row(ordinal).asInstanceOf[T]
+
+ override def toTColumn(rows: Seq[Seq[String]], ordinal: Int, typ: String): TColumn =
+ typ match {
+ case COL_STRING_TYPE => asStringTColumn(rows, ordinal)
+ case otherType => throw new UnsupportedOperationException(s"type $otherType")
+ }
+
+ override def toTColumnValue(row: Seq[String], ordinal: Int, types: Seq[String]): TColumnValue =
+ getColumnType(types, ordinal) match {
+ case COL_STRING_TYPE => asStringTColumnValue(row, ordinal)
+ case otherType => throw new UnsupportedOperationException(s"type $otherType")
+ }
+}
+
+object DataAgentTRowSetGenerator {
+ val COL_STRING_TYPE: String = classOf[String].getSimpleName
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/schema/SchemaHelper.scala b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/schema/SchemaHelper.scala
new file mode 100644
index 00000000000..734efe2cf4e
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/schema/SchemaHelper.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.schema
+
+import java.util.Collections
+
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
+
+object SchemaHelper {
+
+ def stringTTypeQualifiers: TTypeQualifiers = {
+ val ret = new TTypeQualifiers()
+ val qualifiers = Collections.emptyMap[String, TTypeQualifierValue]()
+ ret.setQualifiers(qualifiers)
+ ret
+ }
+
+ def stringTTypeDesc: TTypeDesc = {
+ val typeEntry = new TPrimitiveTypeEntry(TTypeId.STRING_TYPE)
+ typeEntry.setTypeQualifiers(stringTTypeQualifiers)
+ val tTypeDesc = new TTypeDesc()
+ tTypeDesc.addToTypes(TTypeEntry.primitiveEntry(typeEntry))
+ tTypeDesc
+ }
+
+ def stringTColumnDesc(fieldName: String, pos: Int): TColumnDesc = {
+ val tColumnDesc = new TColumnDesc()
+ tColumnDesc.setColumnName(fieldName)
+ tColumnDesc.setTypeDesc(stringTTypeDesc)
+ tColumnDesc.setPosition(pos)
+ tColumnDesc
+ }
+
+ def stringTTableSchema(fieldNames: String*): TTableSchema = {
+ val tTableSchema = new TTableSchema()
+ fieldNames.zipWithIndex.foreach { case (f, i) =>
+ tTableSchema.addToColumns(stringTColumnDesc(f, i))
+ }
+ tTableSchema
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/session/DataAgentSessionImpl.scala b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/session/DataAgentSessionImpl.scala
new file mode 100644
index 00000000000..e3bf76e7298
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/session/DataAgentSessionImpl.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.dataagent.session
+
+import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiSQLException}
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
+import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}
+
+class DataAgentSessionImpl(
+ protocol: TProtocolVersion,
+ user: String,
+ password: String,
+ ipAddress: String,
+ conf: Map[String, String],
+ sessionManager: SessionManager)
+ extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
+
+ override val handle: SessionHandle =
+ conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle())
+
+ private val dataAgentProvider =
+ sessionManager.asInstanceOf[DataAgentSessionManager].dataAgentProvider
+
+ override def open(): Unit = {
+ info(s"Starting to open data agent session.")
+ dataAgentProvider.open(handle.identifier.toString, user)
+ try {
+ super.open()
+ } catch {
+ case e: Throwable =>
+ try {
+ dataAgentProvider.close(handle.identifier.toString)
+ } catch {
+ case ex: Throwable =>
+ error("Failed to cleanup data agent provider on session open failure", ex)
+ }
+ throw e
+ }
+ info(s"The data agent session is started.")
+ }
+
+ override def getInfo(infoType: TGetInfoType): TGetInfoValue = withAcquireRelease() {
+ infoType match {
+ case TGetInfoType.CLI_SERVER_NAME | TGetInfoType.CLI_DBMS_NAME =>
+ TGetInfoValue.stringValue("Kyuubi Data Agent Engine")
+ case TGetInfoType.CLI_DBMS_VER =>
+ TGetInfoValue.stringValue(KYUUBI_VERSION)
+ case TGetInfoType.CLI_ODBC_KEYWORDS => TGetInfoValue.stringValue("Unimplemented")
+ case TGetInfoType.CLI_MAX_COLUMN_NAME_LEN =>
+ TGetInfoValue.lenValue(128)
+ case TGetInfoType.CLI_MAX_SCHEMA_NAME_LEN =>
+ TGetInfoValue.lenValue(128)
+ case TGetInfoType.CLI_MAX_TABLE_NAME_LEN =>
+ TGetInfoValue.lenValue(128)
+ case _ => throw KyuubiSQLException(s"Unrecognized GetInfoType value: $infoType")
+ }
+ }
+
+ override def close(): Unit = {
+ try {
+ dataAgentProvider.close(handle.identifier.toString)
+ } finally {
+ super.close()
+ }
+ }
+
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/session/DataAgentSessionManager.scala b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/session/DataAgentSessionManager.scala
new file mode 100644
index 00000000000..30b6d5ba6ba
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/session/DataAgentSessionManager.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.dataagent.session
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
+import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
+import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.engine.dataagent.DataAgentEngine
+import org.apache.kyuubi.engine.dataagent.operation.DataAgentOperationManager
+import org.apache.kyuubi.engine.dataagent.provider.DataAgentProvider
+import org.apache.kyuubi.operation.OperationManager
+import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager}
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion
+
+class DataAgentSessionManager(name: String)
+ extends SessionManager(name) {
+
+ def this() = this(classOf[DataAgentSessionManager].getSimpleName)
+
+ override protected def isServer: Boolean = false
+
+ lazy val dataAgentProvider: DataAgentProvider = DataAgentProvider.load(conf)
+
+ override lazy val operationManager: OperationManager =
+ new DataAgentOperationManager(conf, dataAgentProvider)
+
+ override def initialize(conf: KyuubiConf): Unit = {
+ this.conf = conf
+ super.initialize(conf)
+ }
+
+ override protected def createSession(
+ protocol: TProtocolVersion,
+ user: String,
+ password: String,
+ ipAddress: String,
+ conf: Map[String, String]): Session = {
+ conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID)
+ .flatMap(getSessionOption).getOrElse {
+ new DataAgentSessionImpl(protocol, user, password, ipAddress, conf, this)
+ }
+ }
+
+ override def stop(): Unit = {
+ try {
+ dataAgentProvider.stop()
+ } finally {
+ super.stop()
+ }
+ }
+
+ override def closeSession(sessionHandle: SessionHandle): Unit = {
+ super.closeSession(sessionHandle)
+ if (conf.get(ENGINE_SHARE_LEVEL) == ShareLevel.CONNECTION.toString) {
+ info("Data Agent engine stopped due to session stopped and shared level is CONNECTION.")
+ stopEngine()
+ }
+ }
+
+ private def stopEngine(): Unit = {
+ DataAgentEngine.currentEngine.foreach(_.stop())
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/test/java/org/apache/kyuubi/engine/dataagent/provider/echo/EchoProviderTest.java b/externals/kyuubi-data-agent-engine/src/test/java/org/apache/kyuubi/engine/dataagent/provider/echo/EchoProviderTest.java
new file mode 100644
index 00000000000..2bb7fd1eef0
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/test/java/org/apache/kyuubi/engine/dataagent/provider/echo/EchoProviderTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.provider.echo;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.kyuubi.engine.dataagent.provider.ProviderRunRequest;
+import org.apache.kyuubi.engine.dataagent.runtime.event.AgentEvent;
+import org.apache.kyuubi.engine.dataagent.runtime.event.ContentComplete;
+import org.apache.kyuubi.engine.dataagent.runtime.event.ContentDelta;
+import org.apache.kyuubi.engine.dataagent.runtime.event.EventType;
+import org.junit.Test;
+
+/** Smoke tests for EchoProvider — verify event stream structure and content echo. */
+public class EchoProviderTest {
+
+ @Test
+ public void testEventSequenceAndContentEcho() {
+ EchoProvider provider = new EchoProvider();
+ List events = new ArrayList<>();
+
+ provider.open("session-1", "testuser");
+ provider.run("session-1", new ProviderRunRequest("What is Kyuubi?"), events::add);
+ provider.close("session-1");
+
+ assertFalse("Should emit events", events.isEmpty());
+
+ // Verify event type sequence: AGENT_START -> STEP_START -> CONTENT_DELTA... ->
+ // CONTENT_COMPLETE -> STEP_END -> AGENT_FINISH
+ List types = events.stream().map(AgentEvent::eventType).collect(Collectors.toList());
+ assertEquals(EventType.AGENT_START, types.get(0));
+ assertEquals(EventType.STEP_START, types.get(1));
+ assertEquals(EventType.AGENT_FINISH, types.get(types.size() - 1));
+ assertTrue(types.contains(EventType.CONTENT_DELTA));
+ assertTrue(types.contains(EventType.CONTENT_COMPLETE));
+ assertTrue(types.contains(EventType.STEP_END));
+
+ // Verify deltas concatenate to the complete content, which echoes the question
+ StringBuilder deltas = new StringBuilder();
+ String complete = null;
+ for (AgentEvent event : events) {
+ if (event instanceof ContentDelta) {
+ deltas.append(((ContentDelta) event).text());
+ }
+ if (event instanceof ContentComplete) {
+ complete = ((ContentComplete) event).fullText();
+ }
+ }
+ assertNotNull(complete);
+ assertEquals(complete, deltas.toString());
+ assertTrue("Should echo the question", complete.contains("What is Kyuubi?"));
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/test/java/org/apache/kyuubi/engine/dataagent/runtime/event/EventTest.java b/externals/kyuubi-data-agent-engine/src/test/java/org/apache/kyuubi/engine/dataagent/runtime/event/EventTest.java
new file mode 100644
index 00000000000..50d22da416d
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/test/java/org/apache/kyuubi/engine/dataagent/runtime/event/EventTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.runtime.event;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kyuubi.engine.dataagent.tool.ToolRiskLevel;
+import org.junit.Test;
+
+public class EventTest {
+
+ @Test
+ public void testContentDeltaLongTextTruncated() {
+ String longText = new String(new char[300]).replace('\0', 'a');
+ ContentDelta event = new ContentDelta(longText);
+ String str = event.toString();
+ assertTrue(str.contains("..."));
+ assertTrue(str.length() < longText.length() + 50);
+ }
+
+ @Test
+ public void testContentDeltaNullText() {
+ ContentDelta event = new ContentDelta(null);
+ assertNull(event.text());
+ }
+
+ @Test
+ public void testContentCompleteNull() {
+ ContentComplete event = new ContentComplete(null);
+ assertTrue(event.toString().contains("length=0"));
+ }
+
+ @Test
+ public void testToolCallArgsImmutable() {
+ Map args = new HashMap<>();
+ args.put("key", "value");
+ ToolCall event = new ToolCall("tc-1", "tool", args);
+ try {
+ event.toolArgs().put("new", "entry");
+ fail("Should throw on modification");
+ } catch (UnsupportedOperationException expected) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testToolCallNullArgs() {
+ ToolCall event = new ToolCall("tc-1", "tool", null);
+ assertNotNull(event.toolArgs());
+ assertTrue(event.toolArgs().isEmpty());
+ }
+
+ @Test
+ public void testToolResultError() {
+ ToolResult event = new ToolResult("tc-2", "sql_query", "syntax error", true);
+ assertTrue(event.isError());
+ assertTrue(event.toString().contains("isError=true"));
+ }
+
+ @Test
+ public void testToolResultLongOutputTruncated() {
+ String longOutput = new String(new char[300]).replace('\0', 'x');
+ ToolResult event = new ToolResult("tc-1", "tool", longOutput, false);
+ String str = event.toString();
+ assertTrue(str.contains("..."));
+ }
+
+ @Test
+ public void testApprovalRequestArgsImmutable() {
+ Map args = new HashMap<>();
+ args.put("key", "value");
+ ApprovalRequest event = new ApprovalRequest("req-1", "tc-1", "tool", args, ToolRiskLevel.SAFE);
+ try {
+ event.toolArgs().put("new", "entry");
+ fail("Should throw on modification");
+ } catch (UnsupportedOperationException expected) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testApprovalRequestNullArgs() {
+ ApprovalRequest event = new ApprovalRequest("req-1", "tc-1", "tool", null, ToolRiskLevel.SAFE);
+ assertNotNull(event.toolArgs());
+ assertTrue(event.toolArgs().isEmpty());
+ }
+
+ @Test
+ public void testEventTypeSseNames() {
+ assertEquals("agent_start", EventType.AGENT_START.sseEventName());
+ assertEquals("step_start", EventType.STEP_START.sseEventName());
+ assertEquals("content_delta", EventType.CONTENT_DELTA.sseEventName());
+ assertEquals("content_complete", EventType.CONTENT_COMPLETE.sseEventName());
+ assertEquals("tool_call", EventType.TOOL_CALL.sseEventName());
+ assertEquals("tool_result", EventType.TOOL_RESULT.sseEventName());
+ assertEquals("step_end", EventType.STEP_END.sseEventName());
+ assertEquals("error", EventType.ERROR.sseEventName());
+ assertEquals("approval_request", EventType.APPROVAL_REQUEST.sseEventName());
+ assertEquals("agent_finish", EventType.AGENT_FINISH.sseEventName());
+ }
+
+ @Test
+ public void testAllEventTypesHaveUniqueSseNames() {
+ EventType[] values = EventType.values();
+ java.util.Set names = new java.util.HashSet<>();
+ for (EventType type : values) {
+ assertTrue("Duplicate SSE name: " + type.sseEventName(), names.add(type.sseEventName()));
+ }
+ assertEquals(10, values.length);
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/WithDataAgentEngine.scala b/externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/WithDataAgentEngine.scala
new file mode 100644
index 00000000000..1de14cb366a
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/WithDataAgentEngine.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.dataagent
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+
+trait WithDataAgentEngine extends KyuubiFunSuite {
+
+ protected var engine: DataAgentEngine = _
+ protected var connectionUrl: String = _
+
+ protected val kyuubiConf: KyuubiConf = DataAgentEngine.kyuubiConf
+
+ def withKyuubiConf: Map[String, String]
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ startEngine()
+ }
+
+ override def afterAll(): Unit = {
+ stopEngine()
+ super.afterAll()
+ }
+
+ def stopEngine(): Unit = {
+ if (engine != null) {
+ engine.stop()
+ engine = null
+ }
+ }
+
+ def startEngine(): Unit = {
+ val conf = withKyuubiConf ++ Map(
+ KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT.key -> "0")
+ conf.foreach { case (k, v) =>
+ System.setProperty(k, v)
+ kyuubiConf.set(k, v)
+ }
+ DataAgentEngine.startEngine()
+ engine = DataAgentEngine.currentEngine.get
+ connectionUrl = engine.frontendServices.head.connectionUrl
+ }
+
+ protected def jdbcConnectionUrl: String = s"jdbc:hive2://$connectionUrl/;"
+}
diff --git a/externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/operation/ApprovalWorkflowIntegrationSuite.scala b/externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/operation/ApprovalWorkflowIntegrationSuite.scala
new file mode 100644
index 00000000000..99d65a6009c
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/operation/ApprovalWorkflowIntegrationSuite.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.operation
+
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.dataagent.WithDataAgentEngine
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+
+class ApprovalWorkflowIntegrationSuite extends HiveJDBCTestHelper with WithDataAgentEngine {
+
+ private val JSON = new ObjectMapper()
+
+ override def withKyuubiConf: Map[String, String] = Map(
+ ENGINE_DATA_AGENT_PROVIDER.key -> "echo")
+
+ override protected def jdbcUrl: String = jdbcConnectionUrl
+
+ /** Extract the first JSON result row from the JDBC result set. */
+ private def getFirstRow(stmt: java.sql.Statement, sql: String): String = {
+ val rs = stmt.executeQuery(sql)
+ assert(rs.next(), s"Expected at least one row for: $sql")
+ rs.getString("reply")
+ }
+
+ /** Parse a JSON response and return the node. */
+ private def parseJson(raw: String): JsonNode = {
+ JSON.readTree(raw)
+ }
+
+ test("__approve: returns not_found for non-existent request") {
+ withJdbcStatement() { stmt =>
+ val raw = getFirstRow(stmt, "__approve:non-existent-id-123")
+ val node = parseJson(raw)
+ assert(node.get("status").asText() === "not_found")
+ assert(node.get("action").asText() === "approved")
+ assert(node.get("requestId").asText() === "non-existent-id-123")
+ }
+ }
+
+ test("__deny: returns not_found for non-existent request") {
+ withJdbcStatement() { stmt =>
+ val raw = getFirstRow(stmt, "__deny:non-existent-id-456")
+ val node = parseJson(raw)
+ assert(node.get("status").asText() === "not_found")
+ assert(node.get("action").asText() === "denied")
+ assert(node.get("requestId").asText() === "non-existent-id-456")
+ }
+ }
+
+ test("__approve: with empty requestId throws error") {
+ withJdbcStatement() { stmt =>
+ val ex = intercept[Exception] {
+ stmt.executeQuery("__approve:")
+ }
+ assert(ex.getMessage.toLowerCase.contains("empty") ||
+ ex.getMessage.toLowerCase.contains("requestid"))
+ }
+ }
+
+ test("__deny: with empty requestId throws error") {
+ withJdbcStatement() { stmt =>
+ val ex = intercept[Exception] {
+ stmt.executeQuery("__deny:")
+ }
+ assert(ex.getMessage.toLowerCase.contains("empty") ||
+ ex.getMessage.toLowerCase.contains("requestid"))
+ }
+ }
+
+ test("non-approval command routes to echo provider") {
+ withJdbcStatement() { stmt =>
+ // A statement not starting with __approve: or __deny: will be treated
+ // as a regular query (not an approval command), so it goes through the echo
+ // provider instead. This verifies routing behavior.
+ val rs = stmt.executeQuery("hello world")
+ val sb = new StringBuilder
+ while (rs.next()) {
+ sb.append(rs.getString("reply"))
+ }
+ // The echo provider wraps response in JSON events -- verify it contains the text
+ val raw = sb.toString()
+ assert(raw.contains("content_delta"), s"Expected content_delta events in: $raw")
+ // "hello" and "world" may be in separate content_delta events
+ assert(raw.contains("hello") && raw.contains("world"), s"Expected echo of input in: $raw")
+ }
+ }
+
+ test("isApprovalCommand correctly identifies prefixes") {
+ assert(ApproveToolCall.isApprovalCommand("__approve:abc"))
+ assert(ApproveToolCall.isApprovalCommand("__deny:abc"))
+ assert(ApproveToolCall.isApprovalCommand(" __approve:abc "))
+ assert(ApproveToolCall.isApprovalCommand(" __deny:abc "))
+ assert(!ApproveToolCall.isApprovalCommand("SELECT 1"))
+ assert(!ApproveToolCall.isApprovalCommand("__invalid:abc"))
+ assert(!ApproveToolCall.isApprovalCommand(""))
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/operation/DataAgentE2ESuite.scala b/externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/operation/DataAgentE2ESuite.scala
new file mode 100644
index 00000000000..977c655dabd
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/operation/DataAgentE2ESuite.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.operation
+
+import java.sql.DriverManager
+
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.dataagent.WithDataAgentEngine
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+
+/**
+ * End-to-end test for the Data Agent engine.
+ * Full pipeline: JDBC Client -> Kyuubi Thrift -> DataAgentEngine
+ * -> LLM -> Tools -> SQLite -> Results
+ *
+ * Requires DATA_AGENT_LLM_API_KEY and DATA_AGENT_LLM_API_URL environment variables.
+ */
+class DataAgentE2ESuite extends HiveJDBCTestHelper with WithDataAgentEngine {
+
+ private val apiKey = sys.env.getOrElse("DATA_AGENT_LLM_API_KEY", "")
+ private val apiUrl = sys.env.getOrElse("DATA_AGENT_LLM_API_URL", "")
+ private val modelName = sys.env.getOrElse("DATA_AGENT_LLM_MODEL", "gpt-4o")
+ private val dbPath =
+ s"${System.getProperty("java.io.tmpdir")}/dataagent_e2e_test_${java.util.UUID.randomUUID()}.db"
+
+ override def withKyuubiConf: Map[String, String] = Map(
+ ENGINE_DATA_AGENT_PROVIDER.key -> "OPENAI_COMPATIBLE",
+ ENGINE_DATA_AGENT_LLM_API_KEY.key -> apiKey,
+ ENGINE_DATA_AGENT_LLM_API_URL.key -> apiUrl,
+ ENGINE_DATA_AGENT_LLM_MODEL.key -> modelName,
+ ENGINE_DATA_AGENT_MAX_ITERATIONS.key -> "10",
+ ENGINE_DATA_AGENT_APPROVAL_MODE.key -> "AUTO_APPROVE",
+ ENGINE_DATA_AGENT_JDBC_URL.key -> s"jdbc:sqlite:$dbPath")
+
+ override protected def jdbcUrl: String = jdbcConnectionUrl
+
+ private val enabled: Boolean = apiKey.nonEmpty && apiUrl.nonEmpty
+
+ override def beforeAll(): Unit = {
+ if (enabled) {
+ setupTestDatabase()
+ super.beforeAll()
+ }
+ }
+
+ override def afterAll(): Unit = {
+ if (enabled) {
+ super.afterAll()
+ cleanupTestDatabase()
+ }
+ }
+
+ private def setupTestDatabase(): Unit = {
+ // Clean up any previous test DB
+ new java.io.File(dbPath).delete()
+ val conn = DriverManager.getConnection(s"jdbc:sqlite:$dbPath")
+ try {
+ val stmt = conn.createStatement()
+ // Create departments table
+ stmt.execute("""
+ CREATE TABLE departments (
+ id INTEGER PRIMARY KEY,
+ name TEXT NOT NULL,
+ location TEXT
+ )""")
+ stmt.execute("INSERT INTO departments VALUES (1, 'Engineering', 'Beijing')")
+ stmt.execute("INSERT INTO departments VALUES (2, 'Sales', 'Shanghai')")
+ stmt.execute("INSERT INTO departments VALUES (3, 'Marketing', 'Hangzhou')")
+
+ // Create employees table
+ stmt.execute("""
+ CREATE TABLE employees (
+ id INTEGER PRIMARY KEY,
+ name TEXT NOT NULL,
+ department_id INTEGER,
+ salary REAL,
+ hire_date TEXT,
+ FOREIGN KEY (department_id) REFERENCES departments(id)
+ )""")
+ stmt.execute("INSERT INTO employees VALUES (1, 'Alice', 1, 25000, '2022-01-15')")
+ stmt.execute("INSERT INTO employees VALUES (2, 'Bob', 1, 30000, '2021-06-01')")
+ stmt.execute("INSERT INTO employees VALUES (3, 'Charlie', 2, 20000, '2023-03-10')")
+ stmt.execute("INSERT INTO employees VALUES (4, 'Diana', 2, 22000, '2022-09-20')")
+ stmt.execute("INSERT INTO employees VALUES (5, 'Eve', 3, 18000, '2023-07-01')")
+ stmt.execute("INSERT INTO employees VALUES (6, 'Frank', 1, 35000, '2020-04-15')")
+ } finally {
+ conn.close()
+ }
+ }
+
+ private def cleanupTestDatabase(): Unit = {
+ new java.io.File(dbPath).delete()
+ }
+
+ test("E2E: agent answers data question through full Kyuubi pipeline") {
+ assume(enabled, "DATA_AGENT_LLM_API_KEY/API_URL not set, skipping E2E tests")
+ // scalastyle:off println
+ withJdbcStatement() { stmt =>
+ // Ask a question that requires schema exploration + SQL execution
+ val result = stmt.executeQuery(
+ "Which department has the highest average salary?")
+
+ val sb = new StringBuilder
+ while (result.next()) {
+ val chunk = result.getString("reply")
+ sb.append(chunk)
+ print(chunk) // real-time output for debugging
+ }
+ println()
+
+ val reply = sb.toString()
+
+ // The agent should have:
+ // 1. Explored the schema (mentioned table names or columns)
+ // 2. Executed SQL (the reply should contain actual data)
+ // 3. Answered with "Engineering" (avg salary 30000)
+ assert(reply.nonEmpty, "Agent should return a non-empty response")
+ assert(
+ reply.toLowerCase.contains("engineering") || reply.contains("30000"),
+ s"Expected the answer to mention 'Engineering' or '30000', got: ${reply.take(500)}")
+ }
+ // scalastyle:on println
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/operation/DataAgentOperationSuite.scala b/externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/operation/DataAgentOperationSuite.scala
new file mode 100644
index 00000000000..df6e2632c1d
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/operation/DataAgentOperationSuite.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.operation
+
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.dataagent.WithDataAgentEngine
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+
+class DataAgentOperationSuite extends HiveJDBCTestHelper with WithDataAgentEngine {
+
+ private val JSON = new ObjectMapper()
+
+ override def withKyuubiConf: Map[String, String] = Map(
+ ENGINE_DATA_AGENT_PROVIDER.key -> "echo")
+
+ override protected def jdbcUrl: String = jdbcConnectionUrl
+
+ /** Extract concatenated text from SSE JSON chunks returned by the agent. */
+ private def extractText(rawReply: String): String = {
+ val sb = new StringBuilder
+ // Each row is a JSON object; parse and extract "text" from content events
+ val parser = JSON.getFactory.createParser(rawReply)
+ parser.configure(
+ com.fasterxml.jackson.core.JsonParser.Feature.AUTO_CLOSE_SOURCE,
+ false)
+ try {
+ val it = JSON.readValues(parser, classOf[JsonNode])
+ while (it.hasNext) {
+ val node = it.next()
+ if (node.has("type") && "content_delta" == node.get("type").asText() && node.has("text")) {
+ sb.append(node.get("text").asText())
+ }
+ }
+ } catch {
+ case _: Exception => // ignore trailing parse issues
+ }
+ sb.toString()
+ }
+
+ test("echo provider returns streaming response via JDBC") {
+ withJdbcStatement() { stmt =>
+ val result = stmt.executeQuery("What tables are in the database?")
+ val sb = new StringBuilder
+ while (result.next()) {
+ sb.append(result.getString("reply"))
+ }
+ val text = extractText(sb.toString())
+ assert(text.contains("[DataAgent Echo]"))
+ assert(text.contains("What tables are in the database?"))
+ }
+ }
+
+ test("multiple queries in same session") {
+ withJdbcStatement() { stmt =>
+ val result1 = stmt.executeQuery("first question")
+ val sb1 = new StringBuilder
+ while (result1.next()) {
+ sb1.append(result1.getString("reply"))
+ }
+ assert(extractText(sb1.toString()).contains("first question"))
+
+ val result2 = stmt.executeQuery("second question")
+ val sb2 = new StringBuilder
+ while (result2.next()) {
+ sb2.append(result2.getString("reply"))
+ }
+ assert(extractText(sb2.toString()).contains("second question"))
+ }
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/operation/IncrementalFetchIteratorCompactionSuite.scala b/externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/operation/IncrementalFetchIteratorCompactionSuite.scala
new file mode 100644
index 00000000000..1071111dfd7
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/operation/IncrementalFetchIteratorCompactionSuite.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.operation
+
+import org.apache.kyuubi.KyuubiFunSuite
+
+/**
+ * Tests for the compaction behavior of [[IncrementalFetchIterator]].
+ * Compaction removes consumed entries from the front of the internal buffer
+ * when the consumed prefix exceeds COMPACT_THRESHOLD (1024).
+ */
+class IncrementalFetchIteratorCompactionSuite extends KyuubiFunSuite {
+
+ private val COMPACT_THRESHOLD = 1024
+
+ test("compaction triggers after consuming >= 1024 items") {
+ val iter = new IncrementalFetchIterator[Int]()
+ val total = 2000
+
+ // Append 2000 items
+ for (i <- 0 until total) iter.append(i)
+
+ // Consume first 1024 items
+ iter.fetchNext()
+ for (_ <- 0 until COMPACT_THRESHOLD) {
+ assert(iter.hasNext)
+ iter.next()
+ }
+ // Position should be at 1024
+ assert(iter.getPosition === COMPACT_THRESHOLD)
+
+ // Call fetchNext() which triggers compactIfNeeded()
+ iter.fetchNext()
+ assert(iter.getFetchStart === COMPACT_THRESHOLD)
+
+ // Verify remaining items are still accessible and correct
+ var count = 0
+ while (iter.hasNext) {
+ val v = iter.next()
+ assert(v === COMPACT_THRESHOLD + count, s"Expected ${COMPACT_THRESHOLD + count} but got $v")
+ count += 1
+ }
+ assert(count === total - COMPACT_THRESHOLD)
+ assert(iter.getPosition === total)
+ }
+
+ test("no compaction when consumed items below threshold") {
+ val iter = new IncrementalFetchIterator[Int]()
+
+ // Append 500 items (below threshold)
+ for (i <- 0 until 500) iter.append(i)
+
+ // Consume all 500
+ iter.fetchNext()
+ for (i <- 0 until 500) {
+ assert(iter.next() === i)
+ }
+
+ // fetchNext triggers compactIfNeeded, but 500 < 1024 so no compaction
+ iter.fetchNext()
+
+ // Verify position tracking is correct
+ assert(iter.getPosition === 500)
+ assert(iter.getFetchStart === 500)
+ assert(!iter.hasNext)
+ }
+
+ test("compaction preserves logical position correctness") {
+ val iter = new IncrementalFetchIterator[Int]()
+ // Use an exact multiple of COMPACT_THRESHOLD to avoid partial batch issues
+ val total = 3 * COMPACT_THRESHOLD // 3072
+
+ for (i <- 0 until total) iter.append(i)
+
+ // Read in batches, triggering multiple compactions
+ var nextExpected = 0
+ for (_ <- 0 until 3) {
+ iter.fetchNext()
+ for (_ <- 0 until COMPACT_THRESHOLD) {
+ assert(iter.hasNext)
+ val v = iter.next()
+ assert(v === nextExpected, s"Expected $nextExpected but got $v")
+ nextExpected += 1
+ }
+ }
+
+ // All items consumed
+ iter.fetchNext()
+ assert(!iter.hasNext)
+ assert(nextExpected === total, s"Expected to read $total items, read $nextExpected")
+ }
+
+ test("fetchAbsolute works correctly after compaction") {
+ val iter = new IncrementalFetchIterator[Int]()
+ val total = 2048
+
+ for (i <- 0 until total) iter.append(i)
+
+ // Consume 1024 to trigger compaction
+ iter.fetchNext()
+ for (_ <- 0 until COMPACT_THRESHOLD) iter.next()
+ iter.fetchNext() // triggers compaction
+
+ // fetchAbsolute to a position within the remaining range
+ iter.fetchAbsolute(1500)
+ assert(iter.getPosition === 1500)
+ assert(iter.hasNext)
+ assert(iter.next() === 1500)
+
+ // fetchAbsolute to position 0 -- before the compacted range,
+ // should be clamped to trimmedCount (1024)
+ iter.fetchAbsolute(0)
+ assert(iter.getPosition === COMPACT_THRESHOLD)
+ assert(iter.hasNext)
+ assert(iter.next() === COMPACT_THRESHOLD)
+ }
+
+ test("interleaved append and consume with compaction") {
+ val iter = new IncrementalFetchIterator[Int]()
+
+ // Append 1024 items
+ for (i <- 0 until COMPACT_THRESHOLD) iter.append(i)
+
+ // Consume them all
+ iter.fetchNext()
+ for (i <- 0 until COMPACT_THRESHOLD) {
+ assert(iter.next() === i)
+ }
+
+ // Append another 1024
+ for (i <- COMPACT_THRESHOLD until 2 * COMPACT_THRESHOLD) iter.append(i)
+
+ // fetchNext triggers compaction of first 1024
+ iter.fetchNext()
+
+ // Verify second batch is accessible
+ for (i <- COMPACT_THRESHOLD until 2 * COMPACT_THRESHOLD) {
+ assert(iter.hasNext)
+ assert(iter.next() === i)
+ }
+ assert(!iter.hasNext)
+ assert(iter.getPosition === 2 * COMPACT_THRESHOLD)
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/operation/IncrementalFetchIteratorSuite.scala b/externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/operation/IncrementalFetchIteratorSuite.scala
new file mode 100644
index 00000000000..4d996d01a20
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/test/scala/org/apache/kyuubi/engine/dataagent/operation/IncrementalFetchIteratorSuite.scala
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.dataagent.operation
+
+import java.util.concurrent.{CountDownLatch, CyclicBarrier, Executors, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.kyuubi.KyuubiFunSuite
+
+class IncrementalFetchIteratorSuite extends KyuubiFunSuite {
+
+ test("basic append and iterate") {
+ val iter = new IncrementalFetchIterator[String]()
+ iter.append("a")
+ iter.append("b")
+ iter.append("c")
+
+ iter.fetchNext()
+ assert(iter.hasNext)
+ assert(iter.next() === "a")
+ assert(iter.next() === "b")
+ assert(iter.next() === "c")
+ assert(!iter.hasNext)
+ assert(iter.getPosition === 3)
+ }
+
+ test("fetchAbsolute clamps to valid range") {
+ val iter = new IncrementalFetchIterator[Int]()
+ for (i <- 1 to 10) iter.append(i)
+
+ iter.fetchAbsolute(-5)
+ assert(iter.getPosition === 0)
+ assert(iter.getFetchStart === 0)
+
+ iter.fetchAbsolute(100)
+ assert(iter.getPosition === 10)
+ assert(iter.getFetchStart === 10)
+
+ iter.fetchAbsolute(5)
+ assert(iter.getPosition === 5)
+ assert(iter.next() === 6) // 0-indexed: position 5 = element at index 5
+ }
+
+ test("fetchNext sets fetchStart to current position") {
+ val iter = new IncrementalFetchIterator[String]()
+ iter.append("x")
+ iter.append("y")
+
+ iter.fetchNext()
+ assert(iter.getFetchStart === 0)
+ iter.next()
+ iter.next()
+ iter.fetchNext()
+ assert(iter.getFetchStart === 2)
+ }
+
+ test("concurrent append and read - producer/consumer") {
+ val iter = new IncrementalFetchIterator[Int]()
+ val totalItems = 10000
+ val consumed = new AtomicInteger(0)
+ val latch = new CountDownLatch(2)
+
+ // Producer thread
+ val producer = new Thread(() => {
+ try {
+ for (i <- 1 to totalItems) {
+ iter.append(i)
+ }
+ } finally {
+ latch.countDown()
+ }
+ })
+
+ // Consumer thread
+ val consumer = new Thread(() => {
+ try {
+ var count = 0
+ var lastSeen = 0
+ val deadline = System.currentTimeMillis() + 10000
+ while (count < totalItems && System.currentTimeMillis() < deadline) {
+ iter.fetchNext()
+ while (iter.hasNext) {
+ val v = iter.next()
+ assert(v === lastSeen + 1, s"Expected ${lastSeen + 1} but got $v")
+ lastSeen = v
+ count += 1
+ }
+ if (count < totalItems) Thread.`yield`()
+ }
+ consumed.set(count)
+ } finally {
+ latch.countDown()
+ }
+ })
+
+ producer.start()
+ consumer.start()
+ assert(latch.await(30, TimeUnit.SECONDS), "Timed out waiting for producer/consumer")
+
+ assert(consumed.get() === totalItems, s"Expected $totalItems items, got ${consumed.get()}")
+ }
+
+ test("concurrent writer does not corrupt single reader") {
+ // IncrementalFetchIterator is designed for single-consumer use.
+ // This test verifies that a single reader reading concurrently with
+ // a single writer does not produce exceptions or corrupt data.
+ val iter = new IncrementalFetchIterator[Int]()
+ val totalItems = 5000
+ val barrier = new CyclicBarrier(2)
+ val errors = new AtomicInteger(0)
+ val pool = Executors.newFixedThreadPool(2)
+
+ // Writer
+ pool.submit(new Runnable {
+ override def run(): Unit = {
+ barrier.await()
+ for (i <- 1 to totalItems) iter.append(i)
+ }
+ })
+
+ // Single reader
+ pool.submit(new Runnable {
+ override def run(): Unit = {
+ try {
+ barrier.await()
+ var count = 0
+ val deadline = System.currentTimeMillis() + 10000
+ while (count < totalItems && System.currentTimeMillis() < deadline) {
+ iter.fetchNext()
+ while (iter.hasNext) {
+ iter.next()
+ count += 1
+ }
+ if (count < totalItems) Thread.`yield`()
+ }
+ if (count != totalItems) errors.incrementAndGet()
+ } catch {
+ case _: Exception => errors.incrementAndGet()
+ }
+ }
+ })
+
+ pool.shutdown()
+ assert(pool.awaitTermination(30, TimeUnit.SECONDS))
+ assert(errors.get() === 0, s"Got ${errors.get()} errors")
+ }
+
+ test("empty iterator") {
+ val iter = new IncrementalFetchIterator[String]()
+ assert(!iter.hasNext)
+ assert(iter.getPosition === 0)
+ assert(iter.getFetchStart === 0)
+ }
+
+ test("position tracking across multiple fetch cycles") {
+ val iter = new IncrementalFetchIterator[String]()
+ iter.append("a")
+ iter.append("b")
+ iter.append("c")
+ iter.append("d")
+
+ // First fetch: read 2
+ iter.fetchNext()
+ iter.next() // a
+ iter.next() // b
+ assert(iter.getPosition === 2)
+ assert(iter.getFetchStart === 0)
+
+ // Second fetch: read remaining
+ iter.fetchNext()
+ assert(iter.getFetchStart === 2)
+ iter.next() // c
+ iter.next() // d
+ assert(!iter.hasNext)
+ assert(iter.getPosition === 4)
+ }
+}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 8e8d31550e8..05e93212990 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -3754,6 +3754,111 @@ object KyuubiConf {
.checkValue(_ >= 0, "must be 0 or positive number")
.createWithDefault(Duration.ofSeconds(120).toMillis)
+ val ENGINE_DATA_AGENT_MEMORY: ConfigEntry[String] =
+ buildConf("kyuubi.engine.data.agent.memory")
+ .doc("The heap memory for the Data Agent engine")
+ .version("1.12.0")
+ .stringConf
+ .createWithDefault("1g")
+
+ val ENGINE_DATA_AGENT_JAVA_OPTIONS: OptionalConfigEntry[String] =
+ buildConf("kyuubi.engine.data.agent.java.options")
+ .doc("The extra Java options for the Data Agent engine")
+ .version("1.12.0")
+ .stringConf
+ .createOptional
+
+ val ENGINE_DATA_AGENT_EXTRA_CLASSPATH: OptionalConfigEntry[String] =
+ buildConf("kyuubi.engine.data.agent.extra.classpath")
+ .doc("The extra classpath for the Data Agent engine, for configuring the location " +
+ "of the LLM SDK and etc.")
+ .version("1.12.0")
+ .stringConf
+ .createOptional
+
+ val ENGINE_DATA_AGENT_PROVIDER: ConfigEntry[String] =
+ buildConf("kyuubi.engine.data.agent.provider")
+ .doc("The provider for the Data Agent engine. Candidates: " +
+ " - ECHO: simply echoes the input, for testing purpose.
" +
+ " - OPENAI_COMPATIBLE: OpenAI-compatible LLM provider.
" +
+ "
")
+ .version("1.12.0")
+ .stringConf
+ .transform {
+ case "ECHO" | "echo" =>
+ "org.apache.kyuubi.engine.dataagent.provider.echo.EchoProvider"
+ case "OPENAI_COMPATIBLE" | "openai_compatible" | "openai-compatible" =>
+ "org.apache.kyuubi.engine.dataagent.provider.openai.OpenAiProvider"
+ case other => other
+ }
+ .createWithDefault("ECHO")
+
+ val ENGINE_DATA_AGENT_LLM_API_KEY: OptionalConfigEntry[String] =
+ buildConf("kyuubi.engine.data.agent.llm.api.key")
+ .doc("The API key to access the LLM service for the Data Agent engine.")
+ .version("1.12.0")
+ .stringConf
+ .createOptional
+
+ val ENGINE_DATA_AGENT_LLM_MODEL: OptionalConfigEntry[String] =
+ buildConf("kyuubi.engine.data.agent.llm.model")
+ .doc("The model ID used by the Data Agent engine LLM provider.")
+ .version("1.12.0")
+ .stringConf
+ .createOptional
+
+ val ENGINE_DATA_AGENT_LLM_API_URL: OptionalConfigEntry[String] =
+ buildConf("kyuubi.engine.data.agent.llm.api.url")
+ .doc("The API base URL for the LLM service used by the Data Agent engine.")
+ .version("1.12.0")
+ .stringConf
+ .createOptional
+
+ val ENGINE_DATA_AGENT_MAX_ITERATIONS: ConfigEntry[Int] =
+ buildConf("kyuubi.engine.data.agent.max.iterations")
+ .doc("The maximum number of ReAct loop iterations for the Data Agent engine.")
+ .version("1.12.0")
+ .intConf
+ .checkValue(_ > 0, "must be positive number")
+ .createWithDefault(100)
+
+ val ENGINE_DATA_AGENT_QUERY_TIMEOUT: ConfigEntry[Long] =
+ buildConf("kyuubi.engine.data.agent.query.timeout")
+ .doc("The query execution timeout for the Data Agent SQL tool.")
+ .version("1.12.0")
+ .timeConf
+ .createWithDefaultString("PT5M")
+
+ val ENGINE_DATA_AGENT_JDBC_URL: OptionalConfigEntry[String] =
+ buildConf("kyuubi.engine.data.agent.jdbc.url")
+ .doc("The JDBC URL for the Data Agent engine to connect to the target database. " +
+ "If not set, the Data Agent will connect back to Kyuubi server " +
+ "via ZooKeeper service discovery.")
+ .version("1.12.0")
+ .stringConf
+ .createOptional
+
+ val ENGINE_DATA_AGENT_APPROVAL_MODE: ConfigEntry[String] =
+ buildConf("kyuubi.engine.data.agent.approval.mode")
+ .doc("Default approval mode for tool execution in the Data Agent engine. " +
+ "Candidates: " +
+ " - AUTO_APPROVE: all tools are auto-approved without user interaction.
" +
+ " - NORMAL: only destructive tools require explicit approval.
" +
+ " - STRICT: all tools require explicit user approval.
" +
+ "
")
+ .version("1.12.0")
+ .stringConf
+ .checkValues(Set("STRICT", "NORMAL", "AUTO_APPROVE"))
+ .createWithDefault("NORMAL")
+
+ val FRONTEND_DATA_AGENT_OPERATION_TIMEOUT: ConfigEntry[Long] =
+ buildConf("kyuubi.frontend.data.agent.operation.timeout")
+ .doc("Timeout for waiting on data agent engine launch and " +
+ "operation start in the REST frontend.")
+ .version("1.12.0")
+ .timeConf
+ .createWithDefaultString("PT2M")
+
val ENGINE_JDBC_MEMORY: ConfigEntry[String] =
buildConf("kyuubi.engine.jdbc.memory")
.doc("The heap memory for the JDBC query engine")
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineType.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineType.scala
index 3d850ba14f5..2d733b87a4d 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineType.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineType.scala
@@ -23,5 +23,5 @@ package org.apache.kyuubi.engine
object EngineType extends Enumeration {
type EngineType = Value
- val SPARK_SQL, FLINK_SQL, CHAT, TRINO, HIVE_SQL, JDBC = Value
+ val SPARK_SQL, FLINK_SQL, CHAT, TRINO, HIVE_SQL, JDBC, DATA_AGENT = Value
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 0aa83662352..ab9e19cdb97 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -17,6 +17,8 @@
package org.apache.kyuubi.engine
+import java.nio.charset.StandardCharsets
+import java.security.MessageDigest
import java.util.concurrent.{Semaphore, TimeUnit}
import scala.collection.JavaConverters._
@@ -33,12 +35,13 @@ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_SUBMIT_TIME_KEY
import org.apache.kyuubi.engine.EngineType._
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, GROUP, SERVER, SERVER_LOCAL, ShareLevel}
import org.apache.kyuubi.engine.chat.ChatProcessBuilder
+import org.apache.kyuubi.engine.dataagent.DataAgentProcessBuilder
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder
import org.apache.kyuubi.engine.hive.HiveProcessBuilder
import org.apache.kyuubi.engine.jdbc.JdbcProcessBuilder
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.engine.trino.TrinoProcessBuilder
-import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE}
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ADDRESSES, HA_ENGINE_REF_ID, HA_NAMESPACE}
import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryClientProvider, DiscoveryPaths, ServiceNodeInfo}
import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT, ENGINE_TOTAL}
import org.apache.kyuubi.metrics.MetricsSystem
@@ -113,6 +116,22 @@ private[kyuubi] class EngineRef(
@VisibleForTesting
private[kyuubi] val subdomain: String = conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN) match {
+ // A DATA_AGENT engine is bound 1:1 to a JDBC datasource because the provider SPI is
+ // constructed once per engine from a single kyuubi.engine.data.agent.jdbc.url, and
+ // ProviderRunRequest carries no datasource field — there is no way to dispatch a request
+ // against a different JDBC URL than the one the provider was loaded with. Sessions
+ // targeting different datasources must therefore route to distinct engines, so this
+ // branch takes precedence over engine pool and manual subdomain settings (a Spark-focused
+ // deployment commonly sets kyuubi.engine.pool.size globally, which would otherwise
+ // override the datasource-based isolation and route different datasources to the same
+ // engine pool).
+ case _ if engineType == DATA_AGENT =>
+ conf.get(ENGINE_DATA_AGENT_JDBC_URL).map { url =>
+ val digest = MessageDigest.getInstance("SHA-256")
+ val hex = digest.digest(url.getBytes(StandardCharsets.UTF_8))
+ .take(8).map("%02x".format(_)).mkString
+ s"ds-$hex"
+ }.getOrElse("default")
case subdomain if clientPoolSize > 0 && (subdomain.isEmpty || enginePoolIgnoreSubdomain) =>
val poolSize = math.min(clientPoolSize, poolThreshold)
if (poolSize < clientPoolSize) {
@@ -242,6 +261,17 @@ private[kyuubi] class EngineRef(
defaultEngineName)
case CHAT =>
new ChatProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
+ case DATA_AGENT =>
+ if (conf.get(ENGINE_DATA_AGENT_JDBC_URL).isEmpty) {
+ val haAddresses = conf.get(HA_ADDRESSES)
+ if (haAddresses.nonEmpty) {
+ val jdbcUrl = s"jdbc:hive2://$haAddresses/default;" +
+ s"serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=$serverSpace"
+ conf.set(ENGINE_DATA_AGENT_JDBC_URL.key, jdbcUrl)
+ info(s"Data Agent JDBC URL not configured, using Kyuubi server via ZK: $jdbcUrl")
+ }
+ }
+ new DataAgentProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
}
MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/dataagent/DataAgentProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/dataagent/DataAgentProcessBuilder.scala
new file mode 100644
index 00000000000..c68ec18d8b0
--- /dev/null
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/dataagent/DataAgentProcessBuilder.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import scala.collection.mutable
+
+import com.google.common.annotations.VisibleForTesting
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.kyuubi.{Logging, SCALA_COMPILE_VERSION, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_ID, KYUUBI_SESSION_USER_KEY}
+import org.apache.kyuubi.engine.ProcBuilder
+import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.util.command.CommandLineUtils._
+
+class DataAgentProcessBuilder(
+ override val proxyUser: String,
+ override val doAsEnabled: Boolean,
+ override val conf: KyuubiConf,
+ val engineRefId: String,
+ val extraEngineLog: Option[OperationLog] = None)
+ extends ProcBuilder with Logging {
+
+ @VisibleForTesting
+ def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) = {
+ this(proxyUser, doAsEnabled, conf, "")
+ }
+
+ override def shortName: String = "data-agent"
+
+ override protected def module: String = "kyuubi-data-agent-engine"
+
+ override protected def mainClass: String =
+ "org.apache.kyuubi.engine.dataagent.DataAgentEngine"
+
+ override protected val commands: Iterable[String] = {
+ val buffer = new mutable.ListBuffer[String]()
+ buffer += executable
+
+ val memory = conf.get(ENGINE_DATA_AGENT_MEMORY)
+ buffer += s"-Xmx$memory"
+ buffer += "-Dfile.encoding=UTF-8"
+
+ val javaOptions = conf.get(ENGINE_DATA_AGENT_JAVA_OPTIONS).filter(StringUtils.isNotBlank(_))
+ if (javaOptions.isDefined) {
+ buffer ++= parseOptionString(javaOptions.get)
+ }
+
+ val classpathEntries = new mutable.LinkedHashSet[String]
+ mainResource.foreach(classpathEntries.add)
+ mainResource.foreach { path =>
+ val parent = Paths.get(path).getParent
+ val devDepDir = parent
+ .resolve(s"scala-$SCALA_COMPILE_VERSION")
+ .resolve("jars")
+ if (Files.exists(devDepDir)) {
+ // add dev classpath
+ classpathEntries.add(s"$devDepDir${File.separator}*")
+ } else {
+ // add prod classpath
+ classpathEntries.add(s"$parent${File.separator}*")
+ }
+ }
+
+ val extraCp = conf.get(ENGINE_DATA_AGENT_EXTRA_CLASSPATH)
+ extraCp.foreach(classpathEntries.add)
+ buffer ++= genClasspathOption(classpathEntries)
+
+ buffer += mainClass
+
+ buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser)
+ buffer ++= confKeyValue(KYUUBI_ENGINE_ID, engineRefId)
+
+ buffer ++= confKeyValues(conf.getAll)
+
+ buffer
+ }
+
+ override def toString: String = {
+ if (commands == null) {
+ super.toString
+ } else {
+ redactConfValues(
+ Utils.redactCommandLineArgs(conf, commands),
+ Set(ENGINE_DATA_AGENT_LLM_API_KEY.key)).map {
+ case arg if arg.startsWith("-") || arg == mainClass => s"\\\n\t$arg"
+ case arg => arg
+ }.mkString(" ")
+ }
+ }
+}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/dataagent/DataAgentProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/dataagent/DataAgentProcessBuilderSuite.scala
new file mode 100644
index 00000000000..5c21c31d36c
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/dataagent/DataAgentProcessBuilderSuite.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
+
+class DataAgentProcessBuilderSuite extends KyuubiFunSuite {
+
+ test("toString includes correct main class") {
+ val conf = new KyuubiConf(false)
+ val builder = new DataAgentProcessBuilder("testUser", doAsEnabled = false, conf)
+ val output = builder.toString
+ assert(
+ output.contains("org.apache.kyuubi.engine.dataagent.DataAgentEngine"),
+ s"Expected main class in toString output: $output")
+ }
+
+ test("API key is redacted in toString") {
+ val conf = new KyuubiConf(false)
+ conf.set(ENGINE_DATA_AGENT_LLM_API_KEY.key, "sk-secret-key-12345")
+ val builder = new DataAgentProcessBuilder("testUser", doAsEnabled = false, conf)
+ val output = builder.toString
+ assert(!output.contains("sk-secret-key-12345"), "API key should not appear in toString output")
+ assert(
+ output.contains("**REDACTED**") || output.contains("********"),
+ s"toString should contain a redaction marker, got: $output")
+ }
+
+ test("memory flag uses configured value") {
+ val conf = new KyuubiConf(false)
+ conf.set(ENGINE_DATA_AGENT_MEMORY.key, "2g")
+ val builder = new DataAgentProcessBuilder("testUser", doAsEnabled = false, conf)
+ val output = builder.toString
+ assert(output.contains("-Xmx2g"), s"Expected -Xmx2g in toString: $output")
+ }
+
+ test("default memory is 1g") {
+ val conf = new KyuubiConf(false)
+ val builder = new DataAgentProcessBuilder("testUser", doAsEnabled = false, conf)
+ val output = builder.toString
+ assert(output.contains("-Xmx1g"), s"Expected -Xmx1g in toString: $output")
+ }
+
+ test("extra classpath is included") {
+ val conf = new KyuubiConf(false)
+ conf.set(ENGINE_DATA_AGENT_EXTRA_CLASSPATH.key, "/extra/path/lib/*")
+ val builder = new DataAgentProcessBuilder("testUser", doAsEnabled = false, conf)
+ val output = builder.toString
+ assert(output.contains("/extra/path/lib/*"), s"Expected extra classpath in toString: $output")
+ }
+
+ test("java options are included") {
+ val conf = new KyuubiConf(false)
+ conf.set(ENGINE_DATA_AGENT_JAVA_OPTIONS.key, "-Dfoo=bar -Dbaz=qux")
+ val builder = new DataAgentProcessBuilder("testUser", doAsEnabled = false, conf)
+ val output = builder.toString
+ assert(output.contains("-Dfoo=bar"), s"Expected -Dfoo=bar in toString: $output")
+ assert(output.contains("-Dbaz=qux"), s"Expected -Dbaz=qux in toString: $output")
+ }
+
+ test("proxy user is passed in commands") {
+ val conf = new KyuubiConf(false)
+ val builder = new DataAgentProcessBuilder("myUser", doAsEnabled = false, conf)
+ val output = builder.toString
+ assert(output.contains("myUser"), s"Expected proxy user in toString: $output")
+ }
+}
diff --git a/pom.xml b/pom.xml
index ba5653202d7..db9b46e9132 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,6 +65,7 @@
extensions/spark/kyuubi-spark-lineage-shaded
extensions/spark/kyuubi-spark-jvm-quake
externals/kyuubi-chat-engine
+ externals/kyuubi-data-agent-engine
externals/kyuubi-download
externals/kyuubi-flink-sql-engine
externals/kyuubi-hive-sql-engine