Skip to content

Latest commit

 

History

History
375 lines (284 loc) · 19 KB

File metadata and controls

375 lines (284 loc) · 19 KB

构建状态 PyPI Downloads Discord Twitter

chDB

English

chDB 是一个由 ClickHouse 驱动的嵌入式 SQL OLAP 引擎。更多细节:chDB: ClickHouse as a Function

特点

  • 嵌入在 Python 中的 SQL OLAP 引擎,由 ClickHouse 驱动
  • 不需要安装 ClickHouse
  • 支持 Parquet、CSV、JSON、Arrow、ORC 和其他 60 多种格式的输入输出示例
  • 支持 Python DB API 2.0 标准, example

架构

安装方式

目前,chDB 只支持在 macOS(x86_64 和 ARM64)和 Linux 上的 Python 3.9+。

pip install chdb

用法

在命令行中运行

python3 -m chdb SQL [OutputFormat]

python3 -m chdb "SELECT 1,'abc'" Pretty

有三种使用 chdb 的方法:“原始文件查询(性能)”、“高级查询(推荐)”和“DB-API”:

🗂️ 原始文件查询

(Parquet、CSV、JSON、Arrow、ORC 等 60 多种格式)

您可以执行 SQL 并返回所需格式的数据。

import chdb
res = chdb.query('select version()', 'Pretty'); print(res)

使用 Parquet 或 CSV

# 查看更多数据类型格式,请参见 tests/format_output.py
res = chdb.query('select * from file("data.parquet", Parquet)', 'JSON'); print(res)
res = chdb.query('select * from file("data.csv", CSV)', 'CSV');  print(res)
print(f"SQL read {res.rows_read()} rows, {res.bytes_read()} bytes, elapsed {res.elapsed()} seconds")

参数化查询

import chdb

df = chdb.query(
    "SELECT toDate({base_date:String}) + number AS date "
    "FROM numbers({total_days:UInt64}) "
    "LIMIT {items_per_page:UInt64}",
    "DataFrame",
    params={"base_date": "2025-01-01", "total_days": 10, "items_per_page": 2},
)
print(df)
#         date
# 0 2025-01-01
# 1 2025-01-02

查询进度(progress=auto

import chdb

# Connection API
conn = chdb.connect(":memory:?progress=auto")
conn.query("SELECT sum(number) FROM numbers_mt(1e10) GROUP BY number % 10 SETTINGS max_threads=4")
import chdb

# 一次性 query API
res = chdb.query(
    "SELECT sum(number) FROM numbers_mt(1e10) GROUP BY number % 10 SETTINGS max_threads=4",
    options={"progress": "auto"},
)

progress=auto 的行为:

  • 在终端运行时:在终端中显示文本进度更新。
  • 在 Jupyter/Marimo 中:在 notebook 输出区域渲染进度。

其他进度选项:

  • 进度条:
    • progress=tty:将进度输出到终端 TTY。
    • progress=err:将进度输出到 stderr
    • progress=off:关闭进度条输出。
  • 进度表(终端输出):
    • progress-table=tty:将进度表输出到终端 TTY。
    • progress-table=err:将进度表输出到 stderr
    • progress-table=off:关闭进度表输出。

更多内容请参见:

Pandas DataFrame 输出

# 更多内容请参见 https://clickhouse.com/docs/en/interfaces/formats
chdb.query('select * from file("data.parquet", Parquet)', 'Dataframe')

🗂️ 高级查询

(Pandas DataFrame、Parquet 文件/字节、Arrow 文件/字节)

查询 Pandas DataFrame

import chdb.dataframe as cdf
import pandas as pd
# Join 2 DataFrames
df1 = pd.DataFrame({'a': [1, 2, 3], 'b': ["one", "two", "three"]})
df2 = pd.DataFrame({'c': [1, 2, 3], 'd': ["①", "②", "③"]})
ret_tbl = cdf.query(sql="select * from __tbl1__ t1 join __tbl2__ t2 on t1.a = t2.c",
                  tbl1=df1, tbl2=df2)
print(ret_tbl)
# Query on the DataFrame Table
print(ret_tbl.query('select b, sum(a) from __table__ group by b'))

🗂️ 基于有状态会话 Session 查询

from chdb import session as chs

## 在临时会话中创建DB, Table, View,当会话被删除时自动清除。
sess = chs.Session()
sess.query("CREATE DATABASE IF NOT EXISTS db_xxx ENGINE = Atomic")
sess.query("CREATE TABLE IF NOT EXISTS db_xxx.log_table_xxx (x String, y Int) ENGINE = Log;")
sess.query("INSERT INTO db_xxx.log_table_xxx VALUES ('a', 1), ('b', 3), ('c', 2), ('d', 5);")
sess.query(
    "CREATE VIEW db_xxx.view_xxx AS SELECT * FROM db_xxx.log_table_xxx LIMIT 4;"
)
print("Select from view:\n")
print(sess.query("SELECT * FROM db_xxx.view_xxx", "Pretty"))

参见: test_stateful.py

🗂️ Python DB-API 2.0

import chdb.dbapi as dbapi
print("chdb driver version: {0}".format(dbapi.get_client_info()))

conn1 = dbapi.connect()
cur1 = conn1.cursor()
cur1.execute('select version()')
print("description: ", cur1.description)
print("data: ", cur1.fetchone())
cur1.close()
conn1.close()

🗂️ Query with UDF(User Defined Functions)

from chdb.udf import chdb_udf
from chdb import query

@chdb_udf()
def sum_udf(lhs, rhs):
    return int(lhs) + int(rhs)

print(query("select sum_udf(12,22)"))

参见: test_udf.py.

🗂️ 流式查询

通过分块流式处理大数据集,保持内存使用恒定。

from chdb import session as chs

sess = chs.Session()

# 示例1:流式查询基础用法
rows_cnt = 0
with sess.send_query("SELECT * FROM numbers(200000)", "CSV") as stream_result:
    for chunk in stream_result:
        rows_cnt += chunk.rows_read()

print(rows_cnt) # 200000

# 示例2:使用fetch()手动迭代
rows_cnt = 0
stream_result = sess.send_query("SELECT * FROM numbers(200000)", "CSV")
while True:
    chunk = stream_result.fetch()
    if chunk is None:
        break
    rows_cnt += chunk.rows_read()

print(rows_cnt) # 200000

# 示例3:提前取消查询
rows_cnt = 0
stream_result = sess.send_query("SELECT * FROM numbers(200000)", "CSV")
while True:
    chunk = stream_result.fetch()
    if chunk is None:
        break
    if rows_cnt > 0:
        stream_result.close()
        break
    rows_cnt += chunk.rows_read()

print(rows_cnt) # 65409

# 示例4:使用PyArrow RecordBatchReader进行批量导出以及与其他库集成
import pyarrow as pa
from deltalake import write_deltalake

# 获取arrow格式的流式结果
stream_result = sess.send_query("SELECT * FROM numbers(100000)", "Arrow")

# 创建自定义批次大小的RecordBatchReader(默认rows_per_batch=1000000)
batch_reader = stream_result.record_batch(rows_per_batch=10000)

# 将RecordBatchReader与外部库(如Delta Lake)一起使用
write_deltalake(
    table_or_uri="./my_delta_table",
    data=batch_reader,
    mode="overwrite"
)

stream_result.close()

sess.close()

重要提示:使用流式查询时,如果StreamingResult没有被完全消耗(由于错误或提前终止),必须显式调用stream_result.close()来释放资源,或使用with语句进行自动清理。否则可能会阻塞后续查询。

参见: test_streaming_query.pytest_arrow_record_reader_deltalake.py

更多示例,请参见 examplestests

🧠 AI 辅助 SQL 生成

chDB 可以将自然语言提示转换为 SQL。通过连接/会话字符串配置 AI 客户端参数:

  • ai_provideropenaianthropic。当设置了 ai_base_url 时默认使用 OpenAI 兼容接口,否则自动检测。
  • ai_api_key:API 密钥;也可从环境变量 AI_API_KEYOPENAI_API_KEYANTHROPIC_API_KEY 读取。
  • ai_base_url:OpenAI 兼容服务的自定义 Base URL。
  • ai_model:模型名称(如 gpt-4o-miniclaude-3-opus-20240229)。
  • ai_temperature:生成温度,默认 0.0
  • ai_max_tokens:最大全量生成 token 数,默认 1000
  • ai_timeout_seconds:请求超时时间(秒),默认 30
  • ai_system_prompt:自定义系统提示词。
  • ai_max_steps:工具调用的最大步数,默认 5
  • ai_enable_schema_access:允许 AI 查看数据库/表元数据,默认 true

未开启 AI 或配置缺失时,调用 generate_sql/ask 会抛出 RuntimeError

import chdb

# 使用环境变量 OPENAI_API_KEY/AI_API_KEY/ANTHROPIC_API_KEY 提供凭据
conn = chdb.connect("file::memory:?ai_provider=openai&ai_model=gpt-4o-mini")
conn.query("CREATE TABLE nums (n UInt32) ENGINE = Memory")
conn.query("INSERT INTO nums VALUES (1), (2), (3)")

sql = conn.generate_sql("Select all rows from nums ordered by n desc")
print(sql)  # 例如:SELECT * FROM nums ORDER BY n DESC

# ask():一键生成并执行 SQL
# `ask()` 会先调用 `generate_sql` 再执行 `query`,关键字参数会透传给 `query`。
print(conn.ask("List the numbers table", format="Pretty"))

Session 同样支持以上能力;Session.ask() 会将关键字参数透传给 Session.query

from chdb import session as chs

with chs.Session("file::memory:?ai_provider=openai") as sess:
    sess.query("CREATE TABLE users (id UInt32, name String) ENGINE = Memory")
    sess.query("INSERT INTO users VALUES (1, 'alice'), (2, 'bob')")
    df = sess.ask("Show all users ordered by id", format="DataFrame")
    print(df)

演示和示例

基准测试

文档

贡献

贡献是使开源社区成为一个学习、激励和创造的绝佳场所的原因。您做出的任何贡献都将受到高度赞赏。 以下是您可以提供帮助的事项:

  • 「Star」和「分享」
  • 帮助测试和报告错误
  • 帮助改进文档
  • 帮助提高代码质量和性能

事件

版本说明

请查看 VERSION-GUIDE.md 获取更多信息。

相关论文

版权信息

Apache 2.0,请查看 LICENSE 获取更多信息。

鸣谢

chDB 主要基于 ClickHouse。由于商标和其他原因,我将其命名为 chDB。

联系方式