Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions backend/src/analytics_agent/engines/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,23 @@ def build_mcp_config(self, connection: dict) -> dict:
required_keys=["account", "user"],
credential_keys=["password", "private_key", "pat_token"],
),
"hive": ConnectorSpec(
package="analytics-agent-connector-hive",
env_map={
"host": "HIVE_HOST",
"port": "HIVE_PORT",
"database": "HIVE_DATABASE",
"auth": "HIVE_AUTH",
"user": "HIVE_USER",
"password": "HIVE_PASSWORD",
"kerberos_service_name": "HIVE_KERBEROS_SERVICE_NAME",
},
secret_env_vars={
"password": "HIVE_PASSWORD",
},
required_keys=["host"],
credential_keys=["user", "password"],
),
"bigquery": ConnectorSpec(
package="analytics-agent-connector-bigquery",
env_map={
Expand Down
3 changes: 2 additions & 1 deletion backend/src/analytics_agent/engines/sqlalchemy/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ def _get_engine(self) -> Any:
from sqlalchemy import create_engine

url = self._build_url()
self._engine = create_engine(url)
connect_args = self._cfg.get("connect_args", {})
self._engine = create_engine(url, connect_args=connect_args)
logger.info("[SQLAlchemy] engine created for url=%s", repr(url))
return self._engine

Expand Down
30 changes: 30 additions & 0 deletions connectors/hive/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# analytics-agent-connector-hive

Hive / Apache Kyuubi / Spark Thrift Server MCP connector for [Analytics Agent](https://github.com/datahub-project/analytics-agent).

Installed automatically when you add a Hive data source in the Analytics Agent UI. Can also be installed manually:

```bash
uv tool install analytics-agent-connector-hive
```

## Configuration

All configuration is read from environment variables set by the analytics-agent core when it launches the connector subprocess.

| Variable | Default | Description |
|---|---|---|
| `HIVE_HOST` | *(required)* | HiveServer2 / Kyuubi host |
| `HIVE_PORT` | `10000` | HiveServer2 port |
| `HIVE_DATABASE` | `default` | Default database |
| `HIVE_AUTH` | `NONE` | Auth mode: `NONE`, `NOSASL`, `LDAP`, `PLAIN`, `KERBEROS` |
| `HIVE_USER` | | Username (required for LDAP/PLAIN, recommended for KERBEROS) |
| `HIVE_PASSWORD` | | Password (LDAP/PLAIN only) |
| `HIVE_KERBEROS_SERVICE_NAME` | `hive` | Kerberos service principal prefix |
| `SQL_ROW_LIMIT` | `500` | Maximum rows returned per query |

## Auth modes

- **NONE / NOSASL** — no credentials needed; typical for local or trusted-network deployments
- **LDAP / PLAIN** — username + password
- **KERBEROS** — requires `kerberos` system library (`brew install krb5` / `apt-get install libkrb5-dev`)
Empty file.
175 changes: 175 additions & 0 deletions connectors/hive/analytics_agent_connector_hive/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
"""Hive MCP connector for Analytics Agent.

Runs as a subprocess launched by the analytics-agent core via:
uvx analytics-agent-connector-hive

Reads all config from environment variables. Exposes 4 tools:
execute_sql, list_tables, get_schema, preview_table

Supported auth modes (HIVE_AUTH):
NONE — no authentication (default)
NOSASL — binary transport, no SASL wrapping
LDAP — username + password over SASL PLAIN
PLAIN — same as LDAP
KERBEROS — Kerberos/GSSAPI (requires kerberos system library)
"""

from __future__ import annotations

import logging
import os
from typing import Any

import orjson
from mcp.server.fastmcp import FastMCP

logger = logging.getLogger(__name__)

SQL_ROW_LIMIT = int(os.environ.get("SQL_ROW_LIMIT", "500"))

mcp = FastMCP("hive-connector")

# ── Connection ─────────────────────────────────────────────────────────────────

_conn: Any = None


def _get_connection():
global _conn
if _conn is None:
from pyhive import hive

host = os.environ.get("HIVE_HOST", "")
if not host:
raise RuntimeError("HIVE_HOST is not configured.")

kwargs: dict[str, Any] = {
"host": host,
"port": int(os.environ.get("HIVE_PORT", "10000")),
"database": os.environ.get("HIVE_DATABASE", "default"),
"auth": os.environ.get("HIVE_AUTH", "NONE").upper(),
}

user = os.environ.get("HIVE_USER", "")
password = os.environ.get("HIVE_PASSWORD", "")

if user:
kwargs["username"] = user
if password:
kwargs["password"] = password

kerberos_service = os.environ.get("HIVE_KERBEROS_SERVICE_NAME", "hive")
if kwargs["auth"] == "KERBEROS":
kwargs["kerberos_service_name"] = kerberos_service

_conn = hive.Connection(**kwargs)
return _conn


# ── SQL helpers ────────────────────────────────────────────────────────────────

def _coerce(v: Any) -> Any:
import datetime
from decimal import Decimal

if isinstance(v, Decimal):
return float(v) if v % 1 else int(v)
if isinstance(v, (datetime.datetime, datetime.date)):
return v.isoformat()
if isinstance(v, bytes):
return v.hex()
return v


def _apply_limit(sql: str, limit: int) -> str:
effective = sql.strip().rstrip(";")
if effective.lstrip().upper().startswith("SELECT") and "LIMIT" not in effective.upper():
return f"{effective} LIMIT {limit}"
return effective


def _run_query(sql: str, limit: int | None = None) -> dict:
effective_limit = limit or SQL_ROW_LIMIT
try:
conn = _get_connection()
except Exception as e:
return {"error": str(e), "columns": [], "rows": [], "truncated": False}

effective_sql = _apply_limit(sql, effective_limit)
try:
cursor = conn.cursor()
cursor.execute(effective_sql)
columns = [desc[0] for desc in cursor.description] if cursor.description else []
rows = cursor.fetchall()
truncated = len(rows) >= effective_limit
coerced = [
{c: _coerce(v) for c, v in zip(columns, row, strict=False)} for row in rows
]
return {"columns": columns, "rows": coerced, "truncated": truncated}
except Exception as e:
return {"error": str(e), "columns": [], "rows": [], "truncated": False}


# ── MCP tools ──────────────────────────────────────────────────────────────────

@mcp.tool()
def execute_sql(sql: str) -> str:
"""Execute a SQL query against the connected Hive/Kyuubi/Spark warehouse. Returns JSON with columns and rows."""
return orjson.dumps(_run_query(sql, SQL_ROW_LIMIT)).decode()


@mcp.tool()
def list_tables(schema: str = "") -> str:
"""List tables in the Hive database. Optionally filter by schema (database) name."""
try:
conn = _get_connection()
cursor = conn.cursor()
if schema:
cursor.execute(f"SHOW TABLES IN {schema}")
else:
cursor.execute("SHOW TABLES")
rows = cursor.fetchall()
# pyhive SHOW TABLES returns (database, tableName, isTemporary) in some versions
# and just (tableName,) in others — normalise both.
tables = []
for row in rows:
if len(row) >= 2:
tables.append({"schema": row[0], "name": row[1]})
else:
tables.append({"name": row[0]})
return orjson.dumps(tables).decode()
except Exception as e:
return orjson.dumps({"error": str(e)}).decode()


@mcp.tool()
def get_schema(table: str) -> str:
"""Get the column schema for a Hive table. Use db.table notation for cross-database lookup."""
try:
conn = _get_connection()
cursor = conn.cursor()
cursor.execute(f"DESCRIBE {table}")
rows = cursor.fetchall()
# DESCRIBE returns (col_name, data_type, comment)
columns = [
{"name": row[0], "type": row[1], "comment": row[2] if len(row) > 2 else ""}
for row in rows
if row[0] and not row[0].startswith("#") # skip partition/detail sections
]
return orjson.dumps(columns).decode()
except Exception as e:
return orjson.dumps({"error": str(e)}).decode()


@mcp.tool()
def preview_table(table: str, limit: int = 10) -> str:
"""Preview the first N rows of a Hive table."""
return orjson.dumps(_run_query(f"SELECT * FROM {table}", limit=limit)).decode()


def main() -> None:
mcp.run()


if __name__ == "__main__":
main()
20 changes: 20 additions & 0 deletions connectors/hive/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[project]
name = "analytics-agent-connector-hive"
version = "0.1.0"
description = "Hive / Kyuubi / Spark Thrift Server MCP connector for Analytics Agent"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"mcp>=1.0.0",
"pyhive[hive_pure_sasl]>=0.7.0",
"pure-sasl>=0.6.2",
"thrift-sasl>=0.4.3",
"orjson>=3.10.0",
]

[project.scripts]
analytics-agent-connector-hive = "analytics_agent_connector_hive.server:main"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
2 changes: 2 additions & 0 deletions frontend/src/components/Settings/connections/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export { AddConnectionFlow } from "./AddConnectionFlow";
import { snowflakePlugin } from "./plugins/snowflake";
import { snowflakeMcpPlugin } from "./plugins/snowflake-mcp";
import { bigqueryPlugin } from "./plugins/bigquery";
import { hivePlugin } from "./plugins/hive";
import { mysqlPlugin } from "./plugins/mysql";
import { postgresqlPlugin } from "./plugins/postgresql";
import { sqlitePlugin } from "./plugins/sqlite";
Expand All @@ -19,6 +20,7 @@ export const CONNECTION_PLUGINS: ConnectionPlugin[] = [
snowflakePlugin,
snowflakeMcpPlugin,
bigqueryPlugin,
hivePlugin,
mysqlPlugin,
postgresqlPlugin,
sqlitePlugin,
Expand Down
18 changes: 18 additions & 0 deletions frontend/src/components/Settings/connections/plugins/hive.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { createSimplePlugin } from "../helpers";

export const hivePlugin = createSimplePlugin({
id: "hive",
serviceId: "hive",
label: "Hive / Kyuubi / Spark",
category: "engine",
description: "Connect to HiveServer2, Apache Kyuubi, or Spark Thrift Server",
fields: [
{ key: "host", label: "Host", type: "mono", placeholder: "kyuubi-host or localhost", required: true },
{ key: "port", label: "Port", type: "mono", placeholder: "10000" },
{ key: "database", label: "Database", type: "mono", placeholder: "default" },
{ key: "auth", label: "Auth", type: "mono", placeholder: "NONE (or NOSASL, LDAP, KERBEROS)" },
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing kerberos_service_name

    { key: "kerberos_service_name",     label: "Kerberos Service Name",     type: "mono", placeholder: "hive" },

{ key: "user", label: "Username", type: "mono", placeholder: "analytics_user" },
{ key: "password", label: "Password", type: "password", placeholder: "LDAP/PLAIN only" },
{ key: "kerberos_service_name", label: "Kerberos Service Name", type: "mono", placeholder: "hive" },
],
});
Loading