diff --git a/README.md b/README.md index 8ae4f1831d1..8ee70d24ba9 100644 --- a/README.md +++ b/README.md @@ -157,6 +157,8 @@ Flags: --grpc-headers=KEY=VALUE;... Additional gRPC headers to send with each request to the remote store (key=value pairs). + --storage-backend="clickhouse" + Storage backend for profile data. --clickhouse-address="localhost:9000" ClickHouse server address. --clickhouse-database="parca" @@ -167,6 +169,10 @@ Flags: --clickhouse-table="stacktraces" ClickHouse table name for profile data. --clickhouse-secure Use TLS for ClickHouse connection. + --duckdb-path="" Filesystem path for the DuckDB database file. + Empty means an in-memory database (volatile). + --duckdb-table="stacktraces" + DuckDB table name for profile data. ``` diff --git a/go.mod b/go.mod index fd60e9600cd..7ca93e27bf2 100644 --- a/go.mod +++ b/go.mod @@ -120,6 +120,12 @@ require ( github.com/docker/docker v28.5.2+incompatible // indirect github.com/docker/go-connections v0.6.0 // indirect github.com/docker/go-units v0.5.0 // indirect + github.com/duckdb/duckdb-go-bindings v0.1.21 // indirect + github.com/duckdb/duckdb-go-bindings/darwin-amd64 v0.1.21 // indirect + github.com/duckdb/duckdb-go-bindings/darwin-arm64 v0.1.21 // indirect + github.com/duckdb/duckdb-go-bindings/linux-amd64 v0.1.21 // indirect + github.com/duckdb/duckdb-go-bindings/linux-arm64 v0.1.21 // indirect + github.com/duckdb/duckdb-go-bindings/windows-amd64 v0.1.21 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/efficientgo/core v1.0.0-rc.2 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect @@ -139,6 +145,7 @@ require ( github.com/go-openapi/jsonreference v0.21.0 // indirect github.com/go-openapi/swag v0.23.0 // indirect github.com/go-resty/resty/v2 v2.16.5 // indirect + github.com/go-viper/mapstructure/v2 v2.4.0 // indirect github.com/go-zookeeper/zk v1.0.4 // indirect github.com/goccy/go-json v0.10.5 // indirect github.com/gofrs/flock v0.8.1 // indirect @@ -183,6 +190,9 @@ require ( github.com/kylelemons/godebug v1.1.0 // indirect github.com/linode/linodego v1.52.1 // indirect github.com/mailru/easyjson v0.7.7 // indirect + github.com/marcboeker/go-duckdb/arrowmapping v0.0.21 // indirect + github.com/marcboeker/go-duckdb/mapping v0.0.21 // indirect + github.com/marcboeker/go-duckdb/v2 v2.4.3 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect diff --git a/go.sum b/go.sum index d2d92f765e6..9876427d907 100644 --- a/go.sum +++ b/go.sum @@ -225,6 +225,18 @@ github.com/docker/go-connections v0.6.0 h1:LlMG9azAe1TqfR7sO+NJttz1gy6KO7VJBh+pM github.com/docker/go-connections v0.6.0/go.mod h1:AahvXYshr6JgfUJGdDCs2b5EZG/vmaMAntpSFH5BFKE= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/duckdb/duckdb-go-bindings v0.1.21 h1:bOb/MXNT4PN5JBZ7wpNg6hrj9+cuDjWDa4ee9UdbVyI= +github.com/duckdb/duckdb-go-bindings v0.1.21/go.mod h1:pBnfviMzANT/9hi4bg+zW4ykRZZPCXlVuvBWEcZofkc= +github.com/duckdb/duckdb-go-bindings/darwin-amd64 v0.1.21 h1:Sjjhf2F/zCjPF53c2VXOSKk0PzieMriSoyr5wfvr9d8= +github.com/duckdb/duckdb-go-bindings/darwin-amd64 v0.1.21/go.mod h1:Ezo7IbAfB8NP7CqPIN8XEHKUg5xdRRQhcPPlCXImXYA= +github.com/duckdb/duckdb-go-bindings/darwin-arm64 v0.1.21 h1:IUk0FFUB6dpWLhlN9hY1mmdPX7Hkn3QpyrAmn8pmS8g= +github.com/duckdb/duckdb-go-bindings/darwin-arm64 v0.1.21/go.mod h1:eS7m/mLnPQgVF4za1+xTyorKRBuK0/BA44Oy6DgrGXI= +github.com/duckdb/duckdb-go-bindings/linux-amd64 v0.1.21 h1:Qpc7ZE3n6Nwz30KTvaAwI6nGkXjXmMxBTdFpC8zDEYI= +github.com/duckdb/duckdb-go-bindings/linux-amd64 v0.1.21/go.mod h1:1GOuk1PixiESxLaCGFhag+oFi7aP+9W8byymRAvunBk= +github.com/duckdb/duckdb-go-bindings/linux-arm64 v0.1.21 h1:eX2DhobAZOgjXkh8lPnKAyrxj8gXd2nm+K71f6KV/mo= +github.com/duckdb/duckdb-go-bindings/linux-arm64 v0.1.21/go.mod h1:o7crKMpT2eOIi5/FY6HPqaXcvieeLSqdXXaXbruGX7w= +github.com/duckdb/duckdb-go-bindings/windows-amd64 v0.1.21 h1:hhziFnGV7mpA+v5J5G2JnYQ+UWCCP3NQ+OTvxFX10D8= +github.com/duckdb/duckdb-go-bindings/windows-amd64 v0.1.21/go.mod h1:IlOhJdVKUJCAPj3QsDszUo8DVdvp1nBFp4TUJVdw99s= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= @@ -332,6 +344,8 @@ github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs= +github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/go-zookeeper/zk v1.0.4 h1:DPzxraQx7OrPyXq2phlGlNSIyWEsAox0RJmjTseMV6I= github.com/go-zookeeper/zk v1.0.4/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= @@ -587,6 +601,12 @@ github.com/m1gwings/treedrawer v0.3.3-beta h1:VeeQ4I90+NL0G2Tga3H4EY4hbOyVP3ID4T github.com/m1gwings/treedrawer v0.3.3-beta/go.mod h1:Sebh5tCtjQWAG/B9xWct163vB9pCbBcA1ykaUErDUTY= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/marcboeker/go-duckdb/arrowmapping v0.0.21 h1:geHnVjlsAJGczSWEqYigy/7ARuD+eBtjd0kLN80SPJQ= +github.com/marcboeker/go-duckdb/arrowmapping v0.0.21/go.mod h1:flFTc9MSqQCh2Xm62RYvG3Kyj29h7OtsTb6zUx1CdK8= +github.com/marcboeker/go-duckdb/mapping v0.0.21 h1:6woNXZn8EfYdc9Vbv0qR6acnt0TM1s1eFqnrJZVrqEs= +github.com/marcboeker/go-duckdb/mapping v0.0.21/go.mod h1:q3smhpLyv2yfgkQd7gGHMd+H/Z905y+WYIUjrl29vT4= +github.com/marcboeker/go-duckdb/v2 v2.4.3 h1:bHUkphPsAp2Bh/VFEdiprGpUekxBNZiWWtK+Bv/ljRk= +github.com/marcboeker/go-duckdb/v2 v2.4.3/go.mod h1:taim9Hktg2igHdNBmg5vgTfHAlV26z3gBI0QXQOcuyI= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= diff --git a/pkg/duckdb/client.go b/pkg/duckdb/client.go new file mode 100644 index 00000000000..1d3af24da9a --- /dev/null +++ b/pkg/duckdb/client.go @@ -0,0 +1,73 @@ +// Copyright 2026 The Parca Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package duckdb + +import ( + "context" + "database/sql" + "fmt" + + _ "github.com/marcboeker/go-duckdb/v2" +) + +// Config holds DuckDB connection configuration. +// +// Path is the on-disk file path. An empty Path uses an in-memory database. +// Table is the name of the profile data table. +type Config struct { + Path string + Table string +} + +// Client wraps a DuckDB database/sql connection. +type Client struct { + db *sql.DB + cfg Config +} + +// NewClient opens a DuckDB connection at cfg.Path (file) or in memory if +// the path is empty. The on-disk file is created if it doesn't exist. +func NewClient(_ context.Context, cfg Config) (*Client, error) { + dsn := cfg.Path // empty string == in-memory per duckdb-go convention + db, err := sql.Open("duckdb", dsn) + if err != nil { + return nil, fmt.Errorf("open duckdb: %w", err) + } + + // DuckDB is single-writer per process. Pin connection count to 1 so + // every Appender / Query lands on the same connection and we don't + // race against a transient one. Reads still work fine because the + // embedded engine is single-process anyway. + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + + return &Client{db: db, cfg: cfg}, nil +} + +// Close closes the underlying database/sql connection. +func (c *Client) Close() error { return c.db.Close() } + +// DB returns the underlying *sql.DB. +func (c *Client) DB() *sql.DB { return c.db } + +// Table returns the profile data table name. +func (c *Client) Table() string { return c.cfg.Table } + +// EnsureSchema creates the profile data table if it doesn't already exist. +func (c *Client) EnsureSchema(ctx context.Context) error { + if _, err := c.db.ExecContext(ctx, CreateTableSQL(c.cfg.Table)); err != nil { + return fmt.Errorf("create table %q: %w", c.cfg.Table, err) + } + return nil +} diff --git a/pkg/duckdb/duckdb_test.go b/pkg/duckdb/duckdb_test.go new file mode 100644 index 00000000000..7ecb55653a8 --- /dev/null +++ b/pkg/duckdb/duckdb_test.go @@ -0,0 +1,217 @@ +// Copyright 2026 The Parca Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package duckdb_test + +import ( + "context" + "sort" + "testing" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/go-kit/log" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace/noop" + + "github.com/parca-dev/parca/pkg/duckdb" + "github.com/parca-dev/parca/pkg/profile" + "github.com/parca-dev/parca/pkg/symbolizer" +) + +// nopSymbolizer is a no-op symbolizer for tests — synthetic rows ingested +// here already carry resolved function info. +type nopSymbolizer struct{} + +func (nopSymbolizer) Symbolize(context.Context, symbolizer.SymbolizationRequest) error { return nil } + +// buildSampleRecord builds a one-row Arrow record matching the parca write +// schema. Caller must Release() the returned record. +func buildSampleRecord(t *testing.T, mem memory.Allocator, ts int64) arrow.RecordBatch { + t.Helper() + + schema := profile.BuildArrowSchema([]string{"job"}) + b := array.NewRecordBuilder(mem, schema) + defer b.Release() + + for i, field := range schema.Fields() { + switch field.Name { + case profile.ColumnDuration: + b.Field(i).(*array.Int64Builder).Append(int64(time.Second)) + case profile.ColumnName: + require.NoError(t, b.Field(i).(*array.BinaryDictionaryBuilder).AppendString("process_cpu")) + case profile.ColumnPeriod: + b.Field(i).(*array.Int64Builder).Append(10_000_000) + case profile.ColumnPeriodType: + require.NoError(t, b.Field(i).(*array.BinaryDictionaryBuilder).AppendString("cpu")) + case profile.ColumnPeriodUnit: + require.NoError(t, b.Field(i).(*array.BinaryDictionaryBuilder).AppendString("nanoseconds")) + case profile.ColumnSampleType: + require.NoError(t, b.Field(i).(*array.BinaryDictionaryBuilder).AppendString("cpu")) + case profile.ColumnSampleUnit: + require.NoError(t, b.Field(i).(*array.BinaryDictionaryBuilder).AppendString("nanoseconds")) + case profile.ColumnStacktrace: + lb := b.Field(i).(*array.ListBuilder) + vb := lb.ValueBuilder().(*array.BinaryDictionaryBuilder) + lb.Append(true) + require.NoError(t, vb.Append(encodeLocation(0xdeadbeef, "test-build-id", "/lib/test", "main.run"))) + case profile.ColumnTimestamp: + b.Field(i).(*array.Int64Builder).Append(ts) + case profile.ColumnTimeNanos: + b.Field(i).(*array.Int64Builder).Append(ts * int64(time.Millisecond)) + case profile.ColumnValue: + b.Field(i).(*array.Int64Builder).Append(42) + case profile.ColumnLabelsPrefix + "job": + require.NoError(t, b.Field(i).(*array.BinaryDictionaryBuilder).AppendString("test")) + } + } + + return b.NewRecordBatch() +} + +// encodeLocation produces a varint-encoded location blob in the same +// shape produced by the symbolizer and decoded by the ingester. +// +// Layout (matches pkg/profile + pkg/clickhouse decoders): +// +// addr (uvarint) +// numLines (uvarint) +// hasMapping byte (0|1) +// if hasMapping: buildID (len+bytes), filename (len+bytes), 3 zero uvarints +// per line: +// lineNumber (uvarint) +// hasFunction byte (0|1) +// if hasFunction: startLine (uvarint), name, systemName, filename (each len+bytes) +func encodeLocation(addr uint64, buildID, mappingFile, fnName string) []byte { + var out []byte + out = appendUvarint(out, addr) + out = appendUvarint(out, 1) // 1 line + out = append(out, 0x01) // hasMapping + out = appendBytes(out, []byte(buildID)) + out = appendBytes(out, []byte(mappingFile)) + out = appendUvarint(out, 0) // memoryStart + out = appendUvarint(out, 0) // memoryLength + out = appendUvarint(out, 0) // mappingOffset + + out = appendUvarint(out, 7) // line number + out = append(out, 0x01) // hasFunction + out = appendUvarint(out, 1) // startLine + out = appendBytes(out, []byte(fnName)) + out = appendBytes(out, []byte(fnName)) // system name + out = appendBytes(out, []byte("main.go")) + return out +} + +func appendUvarint(b []byte, v uint64) []byte { + for v >= 0x80 { + b = append(b, byte(v)|0x80) + v >>= 7 + } + return append(b, byte(v)) +} + +func appendBytes(b []byte, payload []byte) []byte { + b = appendUvarint(b, uint64(len(payload))) + return append(b, payload...) +} + +func newTestClient(t *testing.T) *duckdb.Client { + t.Helper() + c, err := duckdb.NewClient(context.Background(), duckdb.Config{Path: "", Table: "stacktraces"}) + require.NoError(t, err) + t.Cleanup(func() { _ = c.Close() }) + require.NoError(t, c.EnsureSchema(context.Background())) + return c +} + +// TestIngestAndQueryRoundTrip ingests one synthetic profile row and +// verifies the live Querier methods return sensible results against it. +func TestIngestAndQueryRoundTrip(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + + client := newTestClient(t) + ctx := context.Background() + logger := log.NewNopLogger() + tracer := noop.NewTracerProvider().Tracer("") + + const tsMillis int64 = 1_700_000_000_000 + rec := buildSampleRecord(t, mem, tsMillis) + defer rec.Release() + + ing := duckdb.NewIngester(logger, client) + require.NoError(t, ing.Ingest(ctx, rec)) + + q := duckdb.NewQuerier(client, logger, tracer, mem, nopSymbolizer{}) + + // HasProfileData / ProfileTypes + has, err := q.HasProfileData(ctx) + require.NoError(t, err) + require.True(t, has) + + types, err := q.ProfileTypes(ctx, time.UnixMilli(0), time.UnixMilli(0)) + require.NoError(t, err) + require.Len(t, types, 1) + require.Equal(t, "process_cpu", types[0].Name) + require.Equal(t, "cpu", types[0].SampleType) + require.True(t, types[0].Delta) + + // Labels / Values + labels, err := q.Labels(ctx, nil, time.UnixMilli(0), time.UnixMilli(0), "") + require.NoError(t, err) + require.Equal(t, []string{"job"}, labels) + + values, err := q.Values(ctx, "job", nil, time.UnixMilli(0), time.UnixMilli(0), "") + require.NoError(t, err) + sort.Strings(values) + require.Equal(t, []string{"test"}, values) + + // QueryRange — wide window covering tsMillis. + start := time.UnixMilli(tsMillis - 1_000) + end := time.UnixMilli(tsMillis + 1_000) + queryStr := `process_cpu:cpu:nanoseconds:cpu:nanoseconds:delta{job="test"}` + series, err := q.QueryRange(ctx, queryStr, start, end, time.Second, 0, []string{"job"}) + require.NoError(t, err) + require.Len(t, series, 1) + require.Equal(t, "job", series[0].Labelset.Labels[0].Name) + require.Equal(t, "test", series[0].Labelset.Labels[0].Value) + require.NotEmpty(t, series[0].Samples) + require.Equal(t, int64(42), series[0].Samples[0].Value) + + // QuerySingle — exact timestamp match. + single, err := q.QuerySingle(ctx, queryStr, time.UnixMilli(tsMillis), false) + require.NoError(t, err) + require.NotEmpty(t, single.Samples) + for _, r := range single.Samples { + r.Release() + } + + // QueryMerge — sum across the window. + merged, err := q.QueryMerge(ctx, queryStr, start, end, []string{"job"}, false, "") + require.NoError(t, err) + require.NotEmpty(t, merged.Samples) + for _, r := range merged.Samples { + r.Release() + } + + // Metadata helpers + mappings, err := q.GetProfileMetadataMappings(ctx, queryStr, start, end) + require.NoError(t, err) + require.Equal(t, []string{"/lib/test"}, mappings) + + metaLabels, err := q.GetProfileMetadataLabels(ctx, queryStr, start, end) + require.NoError(t, err) + require.Equal(t, []string{"job"}, metaLabels) +} diff --git a/pkg/duckdb/filter.go b/pkg/duckdb/filter.go new file mode 100644 index 00000000000..7d162499478 --- /dev/null +++ b/pkg/duckdb/filter.go @@ -0,0 +1,173 @@ +// Copyright 2026 The Parca Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package duckdb + +import ( + "fmt" + "strings" + + "github.com/prometheus/prometheus/model/labels" + + "github.com/parca-dev/parca/pkg/profile" +) + +// ProfileTypeFilter returns SQL WHERE-fragment conditions for the profile +// type identifier (name + sample_type + sample_unit + period_type + +// period_unit + delta-by-duration), plus the matching argument list. +func ProfileTypeFilter(qp profile.QueryParts) (string, []interface{}) { + conditions := []string{ + "name = ?", + "sample_type = ?", + "sample_unit = ?", + "period_type = ?", + "period_unit = ?", + } + args := []interface{}{ + qp.Meta.Name, + qp.Meta.SampleType.Type, + qp.Meta.SampleType.Unit, + qp.Meta.PeriodType.Type, + qp.Meta.PeriodType.Unit, + } + + if qp.Delta { + conditions = append(conditions, "duration != 0") + } else { + conditions = append(conditions, "duration = 0") + } + + return strings.Join(conditions, " AND "), args +} + +// LabelMatchersToSQL translates Prometheus label matchers to DuckDB SQL. +// +// We model labels as MAP(VARCHAR, VARCHAR). Reads use: +// +// element_at(labels, 'foo')[1] +// +// which returns NULL when the key is absent. (`labels['foo']` works on +// recent DuckDB versions but `element_at` is the canonical, version-stable +// form.) Regex uses DuckDB's `regexp_matches`. +func LabelMatchersToSQL(matchers []*labels.Matcher) (string, []interface{}, error) { + if len(matchers) == 0 { + return "", nil, nil + } + + conditions := make([]string, 0, len(matchers)) + args := make([]interface{}, 0, len(matchers)) + + for _, m := range matchers { + condition, arg, err := matcherToSQL(m) + if err != nil { + return "", nil, err + } + conditions = append(conditions, condition) + if arg != nil { + args = append(args, arg) + } + } + + return strings.Join(conditions, " AND "), args, nil +} + +func matcherToSQL(m *labels.Matcher) (string, interface{}, error) { + // element_at(labels, 'k')[1] yields NULL when the key is missing, + // which is what we want for "label absent" semantics. + labelExpr := fmt.Sprintf("element_at(labels, '%s')[1]", escapeIdent(m.Name)) + + switch m.Type { + case labels.MatchEqual: + if m.Value == "" { + return fmt.Sprintf("(%s IS NULL OR %s = '')", labelExpr, labelExpr), nil, nil + } + return fmt.Sprintf("%s = ?", labelExpr), m.Value, nil + + case labels.MatchNotEqual: + if m.Value == "" { + return fmt.Sprintf("(%s IS NOT NULL AND %s != '')", labelExpr, labelExpr), nil, nil + } + return fmt.Sprintf("(%s != ? OR %s IS NULL)", labelExpr, labelExpr), m.Value, nil + + case labels.MatchRegexp: + return fmt.Sprintf("regexp_matches(coalesce(%s, ''), ?)", labelExpr), m.Value, nil + + case labels.MatchNotRegexp: + return fmt.Sprintf("NOT regexp_matches(coalesce(%s, ''), ?)", labelExpr), m.Value, nil + + default: + return "", nil, fmt.Errorf("unsupported matcher type: %v", m.Type) + } +} + +// TimeRangeFilter returns the WHERE-fragment for a time-range filter on +// time_nanos. +func TimeRangeFilter(startNanos, endNanos int64) (string, []interface{}) { + return "time_nanos >= ? AND time_nanos <= ?", []interface{}{startNanos, endNanos} +} + +// BuildWhereClause stitches non-empty fragments into a single WHERE clause. +func BuildWhereClause(conditions []string, allArgs []interface{}) (string, []interface{}) { + nonEmpty := make([]string, 0, len(conditions)) + for _, c := range conditions { + if c != "" { + nonEmpty = append(nonEmpty, c) + } + } + if len(nonEmpty) == 0 { + return "", nil + } + return "WHERE " + strings.Join(nonEmpty, " AND "), allArgs +} + +// QueryToFilters parses a parca query string and returns the WHERE clause +// + bind args + parsed query parts for the given time range. +func QueryToFilters(query string, startNanos, endNanos int64) (string, []interface{}, profile.QueryParts, error) { + qp, err := profile.ParseQuery(query) + if err != nil { + return "", nil, qp, err + } + + profileFilter, profileArgs := ProfileTypeFilter(qp) + + labelFilter, labelArgs, err := LabelMatchersToSQL(qp.Matchers) + if err != nil { + return "", nil, qp, err + } + + timeFilter, timeArgs := TimeRangeFilter(startNanos, endNanos) + + conditions := []string{profileFilter} + args := append([]interface{}{}, profileArgs...) + + if labelFilter != "" { + conditions = append(conditions, labelFilter) + args = append(args, labelArgs...) + } + + if startNanos != 0 || endNanos != 0 { + conditions = append(conditions, timeFilter) + args = append(args, timeArgs...) + } + + whereClause, _ := BuildWhereClause(conditions, args) + return whereClause, args, qp, nil +} + +// escapeIdent escapes a single quote inside a label name embedded as an +// SQL string literal. We can't use a bind parameter here because DuckDB +// (like most engines) doesn't allow parameterising the key argument of +// element_at() across all paths. +func escapeIdent(s string) string { + return strings.ReplaceAll(s, "'", "''") +} diff --git a/pkg/duckdb/ingester.go b/pkg/duckdb/ingester.go new file mode 100644 index 00000000000..7212773754a --- /dev/null +++ b/pkg/duckdb/ingester.go @@ -0,0 +1,299 @@ +// Copyright 2026 The Parca Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package duckdb + +import ( + "context" + "database/sql/driver" + "fmt" + "strings" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/dennwc/varint" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + duckdb "github.com/marcboeker/go-duckdb/v2" + + "github.com/parca-dev/parca/pkg/profile" +) + +// Ingester writes Arrow profile records to DuckDB via the Appender API. +type Ingester struct { + logger log.Logger + client *Client +} + +// NewIngester returns an Ingester bound to client. +func NewIngester(logger log.Logger, client *Client) *Ingester { + return &Ingester{logger: logger, client: client} +} + +// Ingest writes record into the configured DuckDB table. +// +// Arrow → DuckDB row mapping: +// - flat columns (name, sample_type, period, ...) → scalar Appender values +// - labels. Arrow columns → MAP(VARCHAR, VARCHAR) keyed by +// - stacktrace LIST (encoded location bytes) → LIST by +// decoding each binary blob into a location struct +func (i *Ingester) Ingest(ctx context.Context, record arrow.RecordBatch) error { + if record.NumRows() == 0 { + return nil + } + + conn, err := i.client.DB().Conn(ctx) + if err != nil { + return fmt.Errorf("acquire duckdb connection: %w", err) + } + defer conn.Close() + + var appender *duckdb.Appender + if rawErr := conn.Raw(func(driverConn any) error { + dc, ok := driverConn.(driver.Conn) + if !ok { + return fmt.Errorf("duckdb raw connection is not a driver.Conn (got %T)", driverConn) + } + var aerr error + appender, aerr = duckdb.NewAppenderFromConn(dc, "", i.client.Table()) + return aerr + }); rawErr != nil { + return fmt.Errorf("create appender: %w", rawErr) + } + defer appender.Close() + + schema := record.Schema() + + nameIdx := findColumnIndex(schema, profile.ColumnName) + sampleTypeIdx := findColumnIndex(schema, profile.ColumnSampleType) + sampleUnitIdx := findColumnIndex(schema, profile.ColumnSampleUnit) + periodTypeIdx := findColumnIndex(schema, profile.ColumnPeriodType) + periodUnitIdx := findColumnIndex(schema, profile.ColumnPeriodUnit) + periodIdx := findColumnIndex(schema, profile.ColumnPeriod) + durationIdx := findColumnIndex(schema, profile.ColumnDuration) + timestampIdx := findColumnIndex(schema, profile.ColumnTimestamp) + timeNanosIdx := findColumnIndex(schema, profile.ColumnTimeNanos) + valueIdx := findColumnIndex(schema, profile.ColumnValue) + stacktraceIdx := findColumnIndex(schema, profile.ColumnStacktrace) + + labelColumns := make(map[string]int) + for idx, field := range schema.Fields() { + if strings.HasPrefix(field.Name, profile.ColumnLabelsPrefix) { + labelColumns[strings.TrimPrefix(field.Name, profile.ColumnLabelsPrefix)] = idx + } + } + + for row := 0; row < int(record.NumRows()); row++ { + labels := make(duckdb.Map, len(labelColumns)) + for name, colIdx := range labelColumns { + if v := getStringValue(record, colIdx, row); v != "" { + labels[name] = v + } + } + + st := buildStacktraceList(record, stacktraceIdx, row) + + err := appender.AppendRow( + getStringValue(record, nameIdx, row), + getStringValue(record, sampleTypeIdx, row), + getStringValue(record, sampleUnitIdx, row), + getStringValue(record, periodTypeIdx, row), + getStringValue(record, periodUnitIdx, row), + getInt64Value(record, periodIdx, row), + getInt64Value(record, durationIdx, row), + getInt64Value(record, timestampIdx, row), + getInt64Value(record, timeNanosIdx, row), + getInt64Value(record, valueIdx, row), + labels, + st, + ) + if err != nil { + level.Error(i.logger).Log("msg", "duckdb appender row failed", "row", row, "err", err) + return fmt.Errorf("append row %d: %w", row, err) + } + } + + if err := appender.Flush(); err != nil { + return fmt.Errorf("flush appender: %w", err) + } + return nil +} + +// buildStacktraceList decodes the encoded location blobs in record's +// stacktrace LIST column at row and produces the slice form the duckdb +// Appender expects for a LIST(STRUCT(...)) column. +func buildStacktraceList(record arrow.RecordBatch, colIdx, row int) []map[string]any { + if colIdx < 0 { + return nil + } + col := record.Column(colIdx) + listCol, ok := col.(*array.List) + if !ok || listCol.IsNull(row) { + return nil + } + start, end := listCol.ValueOffsets(row) + values := listCol.ListValues() + dictCol, ok := values.(*array.Dictionary) + if !ok { + return nil + } + bin, ok := dictCol.Dictionary().(*array.Binary) + if !ok { + return nil + } + + out := make([]map[string]any, 0, end-start) + for idx := int(start); idx < int(end); idx++ { + if dictCol.IsNull(idx) { + continue + } + raw := bin.Value(dictCol.GetValueIndex(idx)) + sym, _ := profile.DecodeSymbolizationInfo(raw) + line := decodeLineInfo(raw) + out = append(out, map[string]any{ + StFieldAddress: sym.Addr, + StFieldMappingStart: sym.Mapping.StartAddr, + StFieldMappingLimit: sym.Mapping.EndAddr, + StFieldMappingOffset: sym.Mapping.Offset, + StFieldMappingFile: sym.Mapping.File, + StFieldMappingBuildID: string(sym.BuildID), + StFieldLineNumber: line.LineNumber, + StFieldFunctionName: line.FunctionName, + StFieldFunctionSystemName: line.FunctionSystemName, + StFieldFunctionFilename: line.FunctionFilename, + StFieldFunctionStartLine: line.FunctionStartLine, + }) + } + return out +} + +// lineInfo mirrors the bits of the encoded location format we care about. +type lineInfo struct { + LineNumber int64 + FunctionStartLine int64 + FunctionName string + FunctionSystemName string + FunctionFilename string +} + +// decodeLineInfo decodes the line/function portion of a varint-encoded +// location record produced by the symbolizer. +func decodeLineInfo(data []byte) lineInfo { + info := lineInfo{} + + _, offset := varint.Uvarint(data) + + numLines, n := varint.Uvarint(data[offset:]) + offset += n + + hasMapping := data[offset] == 0x1 + offset++ + + if hasMapping { + // buildID + length, n := varint.Uvarint(data[offset:]) + offset += n + int(length) + // filename + length, n = varint.Uvarint(data[offset:]) + offset += n + int(length) + // memoryStart + _, n = varint.Uvarint(data[offset:]) + offset += n + // memoryLength + _, n = varint.Uvarint(data[offset:]) + offset += n + // mappingOffset + _, n = varint.Uvarint(data[offset:]) + offset += n + } + + if numLines > 0 { + ln, n := varint.Uvarint(data[offset:]) + offset += n + info.LineNumber = int64(ln) + + hasFunction := data[offset] == 0x1 + offset++ + if hasFunction { + startLine, n := varint.Uvarint(data[offset:]) + offset += n + info.FunctionStartLine = int64(startLine) + + length, n := varint.Uvarint(data[offset:]) + offset += n + info.FunctionName = string(data[offset : offset+int(length)]) + offset += int(length) + + length, n = varint.Uvarint(data[offset:]) + offset += n + info.FunctionSystemName = string(data[offset : offset+int(length)]) + offset += int(length) + + length, n = varint.Uvarint(data[offset:]) + offset += n + info.FunctionFilename = string(data[offset : offset+int(length)]) + } + } + return info +} + +func findColumnIndex(schema *arrow.Schema, name string) int { + idx := schema.FieldIndices(name) + if len(idx) == 0 { + return -1 + } + return idx[0] +} + +func getStringValue(record arrow.RecordBatch, colIdx, row int) string { + if colIdx < 0 { + return "" + } + col := record.Column(colIdx) + if col.IsNull(row) { + return "" + } + switch c := col.(type) { + case *array.Dictionary: + switch d := c.Dictionary().(type) { + case *array.Binary: + return string(d.Value(c.GetValueIndex(row))) + case *array.String: + return d.Value(c.GetValueIndex(row)) + } + case *array.String: + return c.Value(row) + case *array.Binary: + return string(c.Value(row)) + } + return "" +} + +func getInt64Value(record arrow.RecordBatch, colIdx, row int) int64 { + if colIdx < 0 { + return 0 + } + col := record.Column(colIdx) + if col.IsNull(row) { + return 0 + } + switch c := col.(type) { + case *array.Int64: + return c.Value(row) + case *array.Dictionary: + if d, ok := c.Dictionary().(*array.Int64); ok { + return d.Value(c.GetValueIndex(row)) + } + } + return 0 +} diff --git a/pkg/duckdb/querier.go b/pkg/duckdb/querier.go new file mode 100644 index 00000000000..4547e9da9b2 --- /dev/null +++ b/pkg/duckdb/querier.go @@ -0,0 +1,811 @@ +// Copyright 2026 The Parca Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package duckdb + +import ( + "context" + "database/sql" + "fmt" + "strings" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + duckdb "github.com/marcboeker/go-duckdb/v2" + "github.com/prometheus/prometheus/model/timestamp" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/timestamppb" + + metapb "github.com/parca-dev/parca/gen/proto/go/parca/metastore/v1alpha1" + profilestorepb "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1" + pb "github.com/parca-dev/parca/gen/proto/go/parca/query/v1alpha1" + "github.com/parca-dev/parca/pkg/profile" + "github.com/parca-dev/parca/pkg/symbolizer" +) + +// Querier implements the query.Querier interface against DuckDB. +type Querier struct { + client *Client + logger log.Logger + tracer trace.Tracer + mem memory.Allocator + symbolizer symbolizer.SymbolizationClient +} + +// NewQuerier returns a Querier reading from client. +func NewQuerier( + client *Client, + logger log.Logger, + tracer trace.Tracer, + mem memory.Allocator, + sym symbolizer.SymbolizationClient, +) *Querier { + return &Querier{ + client: client, + logger: logger, + tracer: tracer, + mem: mem, + symbolizer: sym, + } +} + +// stacktraceLoc mirrors the STRUCT layout of stacktrace[i] in the table. +// Field tags must match the SQL column names exactly because go-duckdb's +// Composite scanner uses field names to map STRUCT entries. +type stacktraceLoc struct { + Address uint64 `db:"address"` + MappingStart uint64 `db:"mapping_start"` + MappingLimit uint64 `db:"mapping_limit"` + MappingOffset uint64 `db:"mapping_offset"` + MappingFile string `db:"mapping_file"` + MappingBuildID string `db:"mapping_build_id"` + LineNumber int64 `db:"line_number"` + FunctionName string `db:"function_name"` + FunctionSystemName string `db:"function_system_name"` + FunctionFilename string `db:"function_filename"` + FunctionStartLine int64 `db:"function_start_line"` +} + +func quotedTable(c *Client) string { return quoteIdent(c.Table()) } + +// Labels returns the unique label names within the time range and matching +// the optional profile type. +func (q *Querier) Labels( + ctx context.Context, + _ []string, + start, end time.Time, + profileType string, +) ([]string, error) { + ctx, span := q.tracer.Start(ctx, "DuckDB/Labels") + defer span.End() + + var conditions []string + var args []interface{} + + if start.Unix() != 0 && end.Unix() != 0 { + conditions = append(conditions, "time_nanos > ? AND time_nanos < ?") + args = append(args, start.UnixNano(), end.UnixNano()) + } + + if profileType != "" { + // profileType is "name:st:su:pt:pu[:delta]"; ParseQuery wants a full + // `query{}` form so add empty matchers. + if qp, err := profile.ParseQuery(profileType + "{}"); err == nil { + profileFilter, profileArgs := ProfileTypeFilter(qp) + conditions = append(conditions, profileFilter) + args = append(args, profileArgs...) + } + } + + where := "" + if len(conditions) > 0 { + where = " WHERE " + strings.Join(conditions, " AND ") + } + + // UNNEST(map_keys(labels)) flattens the per-row label keys into rows, + // then DISTINCT collapses duplicates across the result. ORDER BY is + // applied at the outer level. + query := fmt.Sprintf( + "SELECT DISTINCT k FROM (SELECT UNNEST(map_keys(labels)) AS k FROM %s%s) ORDER BY k", + quotedTable(q.client), where, + ) + + rows, err := q.client.DB().QueryContext(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("query labels: %w", err) + } + defer rows.Close() + + var out []string + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + return nil, fmt.Errorf("scan label name: %w", err) + } + out = append(out, name) + } + return out, rows.Err() +} + +// Values returns the unique values seen for labelName within the time range. +func (q *Querier) Values( + ctx context.Context, + labelName string, + _ []string, + start, end time.Time, + profileType string, +) ([]string, error) { + ctx, span := q.tracer.Start(ctx, "DuckDB/Values") + defer span.End() + + labelExpr := fmt.Sprintf("element_at(labels, '%s')[1]", escapeIdent(labelName)) + + conditions := []string{fmt.Sprintf("%s IS NOT NULL", labelExpr)} + var args []interface{} + + if start.Unix() != 0 && end.Unix() != 0 { + conditions = append(conditions, "time_nanos > ? AND time_nanos < ?") + args = append(args, start.UnixNano(), end.UnixNano()) + } + + if profileType != "" { + if qp, err := profile.ParseQuery(profileType + "{}"); err == nil { + profileFilter, profileArgs := ProfileTypeFilter(qp) + conditions = append(conditions, profileFilter) + args = append(args, profileArgs...) + } + } + + query := fmt.Sprintf( + "SELECT DISTINCT %s AS v FROM %s WHERE %s ORDER BY v", + labelExpr, quotedTable(q.client), strings.Join(conditions, " AND "), + ) + + rows, err := q.client.DB().QueryContext(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("query values: %w", err) + } + defer rows.Close() + + var out []string + for rows.Next() { + var v sql.NullString + if err := rows.Scan(&v); err != nil { + return nil, fmt.Errorf("scan value: %w", err) + } + if v.Valid && v.String != "" { + out = append(out, v.String) + } + } + return out, rows.Err() +} + +// ProfileTypes returns the distinct profile-type identifiers in the time +// range. +func (q *Querier) ProfileTypes( + ctx context.Context, + start, end time.Time, +) ([]*pb.ProfileType, error) { + ctx, span := q.tracer.Start(ctx, "DuckDB/ProfileTypes") + defer span.End() + + query := fmt.Sprintf(` + SELECT DISTINCT + name, + sample_type, + sample_unit, + period_type, + period_unit, + (duration > 0) AS delta + FROM %s`, quotedTable(q.client)) + + var args []interface{} + if start.Unix() != 0 && end.Unix() != 0 { + query += " WHERE time_nanos > ? AND time_nanos < ?" + args = append(args, start.UnixNano(), end.UnixNano()) + } + + rows, err := q.client.DB().QueryContext(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("query profile types: %w", err) + } + defer rows.Close() + + var out []*pb.ProfileType + for rows.Next() { + t := &pb.ProfileType{} + if err := rows.Scan( + &t.Name, &t.SampleType, &t.SampleUnit, + &t.PeriodType, &t.PeriodUnit, &t.Delta, + ); err != nil { + return nil, fmt.Errorf("scan profile type: %w", err) + } + out = append(out, t) + } + return out, rows.Err() +} + +// HasProfileData returns true if the table has any rows. +func (q *Querier) HasProfileData(ctx context.Context) (bool, error) { + types, err := q.ProfileTypes(ctx, time.UnixMilli(0), time.UnixMilli(0)) + if err != nil { + return false, err + } + return len(types) > 0, nil +} + +// QueryRange returns time-bucketed metric series for the query. +func (q *Querier) QueryRange( + ctx context.Context, + queryStr string, + startTime, endTime time.Time, + step time.Duration, + _ uint32, + sumBy []string, +) ([]*pb.MetricsSeries, error) { + ctx, span := q.tracer.Start(ctx, "DuckDB/QueryRange") + defer span.End() + + qp, err := profile.ParseQuery(queryStr) + if err != nil { + return nil, err + } + + if step < time.Second { + step = time.Second + } + + profileFilter, profileArgs := ProfileTypeFilter(qp) + labelFilter, labelArgs, err := LabelMatchersToSQL(qp.Matchers) + if err != nil { + return nil, err + } + + // Build optional sumBy projections. labels is a MAP so we extract + // each requested label via element_at(labels, 'k')[1]. + var ( + innerSumBy []string + outerSumBy []string + groupBySumBy []string + ) + for i, name := range sumBy { + alias := fmt.Sprintf("label_%d", i) + innerSumBy = append(innerSumBy, + fmt.Sprintf("element_at(labels, '%s')[1] AS %s", escapeIdent(name), alias), + ) + outerSumBy = append(outerSumBy, alias) + groupBySumBy = append(groupBySumBy, alias) + } + + innerSelectExtras := "" + outerSelectExtras := "" + if len(innerSumBy) > 0 { + innerSelectExtras = ", " + strings.Join(innerSumBy, ", ") + outerSelectExtras = strings.Join(outerSumBy, ", ") + ", " + } + + innerQuery := fmt.Sprintf(` + SELECT + (time_nanos / ?)::BIGINT * ? AS bucket, + SUM(value)::BIGINT AS total_sum, + MIN(duration) AS duration_min%s + FROM %s + WHERE %s + AND time_nanos >= ? AND time_nanos <= ?`, + innerSelectExtras, quotedTable(q.client), profileFilter, + ) + + args := []interface{}{step.Nanoseconds(), step.Nanoseconds()} + args = append(args, profileArgs...) + args = append(args, startTime.UnixNano(), endTime.UnixNano()) + + if labelFilter != "" { + innerQuery += " AND " + labelFilter + args = append(args, labelArgs...) + } + + groupByInner := "bucket" + if len(groupBySumBy) > 0 { + groupByInner = "bucket, " + strings.Join(groupBySumBy, ", ") + } + innerQuery += " GROUP BY " + groupByInner + " ORDER BY bucket" + + groupByOuter := "GROUP BY ALL" + if len(outerSumBy) == 0 { + groupByOuter = "" + } + + sqlQuery := fmt.Sprintf(` + SELECT + %sLIST({bucket: bucket, value: total_sum, duration: duration_min} ORDER BY bucket) AS samples + FROM (%s) + %s`, + outerSelectExtras, innerQuery, groupByOuter, + ) + + rows, err := q.client.DB().QueryContext(ctx, sqlQuery, args...) + if err != nil { + return nil, fmt.Errorf("query range: %w", err) + } + defer rows.Close() + + type sampleRow struct { + Bucket int64 `db:"bucket"` + Value int64 `db:"value"` + Duration int64 `db:"duration"` + } + + var resSeries []*pb.MetricsSeries + for rows.Next() { + labelValues := make([]sql.NullString, len(sumBy)) + scanArgs := make([]interface{}, 0, len(sumBy)+1) + for i := range sumBy { + scanArgs = append(scanArgs, &labelValues[i]) + } + var samples duckdb.Composite[[]sampleRow] + scanArgs = append(scanArgs, &samples) + + if err := rows.Scan(scanArgs...); err != nil { + return nil, fmt.Errorf("scan row: %w", err) + } + + pbLabels := make([]*profilestorepb.Label, len(sumBy)) + for i, n := range sumBy { + val := "" + if labelValues[i].Valid { + val = labelValues[i].String + } + pbLabels[i] = &profilestorepb.Label{Name: n, Value: val} + } + + entries := samples.Get() + pbSamples := make([]*pb.MetricsSample, 0, len(entries)) + for _, e := range entries { + vps := float64(e.Value) + if e.Duration > 0 { + vps = float64(e.Value) / (float64(e.Duration) / float64(time.Second.Nanoseconds())) + } + pbSamples = append(pbSamples, &pb.MetricsSample{ + Timestamp: timestamppb.New(time.Unix(0, e.Bucket)), + Value: e.Value, + ValuePerSecond: vps, + Duration: e.Duration, + }) + } + + resSeries = append(resSeries, &pb.MetricsSeries{ + Labelset: &profilestorepb.LabelSet{Labels: pbLabels}, + PeriodType: &pb.ValueType{ + Type: qp.Meta.PeriodType.Type, + Unit: qp.Meta.PeriodType.Unit, + }, + SampleType: &pb.ValueType{ + Type: qp.Meta.SampleType.Type, + Unit: qp.Meta.SampleType.Unit, + }, + Samples: pbSamples, + }) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate rows: %w", err) + } + if len(resSeries) == 0 { + return nil, status.Error( + codes.NotFound, + "No data found for the query, try a different query or time range or no data has been written to be queried yet.", + ) + } + return resSeries, nil +} + +// QuerySingle returns the symbolised profile at a single timestamp. +func (q *Querier) QuerySingle( + ctx context.Context, + queryStr string, + t time.Time, + invertCallStacks bool, +) (profile.Profile, error) { + ctx, span := q.tracer.Start(ctx, "DuckDB/QuerySingle") + defer span.End() + + qp, err := profile.ParseQuery(queryStr) + if err != nil { + return profile.Profile{}, err + } + + requestedTime := timestamp.FromTime(t) + + profileFilter, profileArgs := ProfileTypeFilter(qp) + labelFilter, labelArgs, err := LabelMatchersToSQL(qp.Matchers) + if err != nil { + return profile.Profile{}, err + } + + sqlQuery := fmt.Sprintf(` + SELECT + stacktrace, + SUM(value)::BIGINT AS value, + SUM(duration)::BIGINT AS sample_duration, + period AS sample_period + FROM %s + WHERE %s + AND timestamp = ?`, + quotedTable(q.client), profileFilter, + ) + + args := append([]interface{}{}, profileArgs...) + args = append(args, requestedTime) + + if labelFilter != "" { + sqlQuery += " AND " + labelFilter + args = append(args, labelArgs...) + } + sqlQuery += " GROUP BY stacktrace, period" + + records, err := q.runStacktraceQuery(ctx, sqlQuery, args, invertCallStacks) + if err != nil { + return profile.Profile{}, err + } + if len(records) == 0 { + return profile.Profile{}, status.Error(codes.NotFound, "could not find profile at requested time and selectors") + } + + qp.Meta.Timestamp = requestedTime + return profile.Profile{Meta: qp.Meta, Samples: records}, nil +} + +// QueryMerge returns the symbolised profile aggregated across the time range. +func (q *Querier) QueryMerge( + ctx context.Context, + queryStr string, + start, end time.Time, + aggregateByLabels []string, + invertCallStacks bool, + _ string, +) (profile.Profile, error) { + ctx, span := q.tracer.Start(ctx, "DuckDB/QueryMerge") + defer span.End() + + qp, err := profile.ParseQuery(queryStr) + if err != nil { + return profile.Profile{}, err + } + + startNanos := start.UnixNano() + endNanos := end.UnixNano() + + profileFilter, profileArgs := ProfileTypeFilter(qp) + labelFilter, labelArgs, err := LabelMatchersToSQL(qp.Matchers) + if err != nil { + return profile.Profile{}, err + } + + groupByLabels := "" + if len(aggregateByLabels) > 0 { + labels := make([]string, len(aggregateByLabels)) + for i, l := range aggregateByLabels { + name := strings.TrimPrefix(l, "labels.") + labels[i] = fmt.Sprintf("element_at(labels, '%s')[1]", escapeIdent(name)) + } + groupByLabels = ", " + strings.Join(labels, ", ") + } + + queryDuration := endNanos - startNanos + + sqlQuery := fmt.Sprintf(` + SELECT + stacktrace, + SUM(value)::BIGINT AS value, + %d::BIGINT AS sample_duration, + period AS sample_period + FROM %s + WHERE %s + AND time_nanos >= ? AND time_nanos <= ?`, + queryDuration, quotedTable(q.client), profileFilter, + ) + + args := append([]interface{}{}, profileArgs...) + args = append(args, startNanos, endNanos) + + if labelFilter != "" { + sqlQuery += " AND " + labelFilter + args = append(args, labelArgs...) + } + + sqlQuery += " GROUP BY stacktrace, period" + groupByLabels + + records, err := q.runStacktraceQuery(ctx, sqlQuery, args, invertCallStacks) + if err != nil { + return profile.Profile{}, err + } + + qp.Meta.Timestamp = startNanos + return profile.Profile{Meta: qp.Meta, Samples: records}, nil +} + +// GetProfileMetadataMappings returns the distinct mapping files seen in +// the matched samples. +func (q *Querier) GetProfileMetadataMappings( + ctx context.Context, + queryStr string, + start, end time.Time, +) ([]string, error) { + ctx, span := q.tracer.Start(ctx, "DuckDB/GetProfileMetadataMappings") + defer span.End() + + qp, err := profile.ParseQuery(queryStr) + if err != nil { + return nil, err + } + + profileFilter, profileArgs := ProfileTypeFilter(qp) + labelFilter, labelArgs, err := LabelMatchersToSQL(qp.Matchers) + if err != nil { + return nil, err + } + + sqlQuery := fmt.Sprintf(` + SELECT DISTINCT loc.mapping_file AS f + FROM %s, UNNEST(stacktrace) AS t(loc) + WHERE %s + AND time_nanos >= ? AND time_nanos <= ?`, + quotedTable(q.client), profileFilter, + ) + + args := append([]interface{}{}, profileArgs...) + args = append(args, start.UnixNano(), end.UnixNano()) + if labelFilter != "" { + sqlQuery += " AND " + labelFilter + args = append(args, labelArgs...) + } + sqlQuery += " ORDER BY f" + + rows, err := q.client.DB().QueryContext(ctx, sqlQuery, args...) + if err != nil { + return nil, fmt.Errorf("query mapping files: %w", err) + } + defer rows.Close() + + var out []string + for rows.Next() { + var s sql.NullString + if err := rows.Scan(&s); err != nil { + return nil, fmt.Errorf("scan mapping file: %w", err) + } + if s.Valid { + out = append(out, s.String) + } + } + return out, rows.Err() +} + +// GetProfileMetadataLabels returns the distinct label names seen in the +// matched samples. +func (q *Querier) GetProfileMetadataLabels( + ctx context.Context, + queryStr string, + start, end time.Time, +) ([]string, error) { + ctx, span := q.tracer.Start(ctx, "DuckDB/GetProfileMetadataLabels") + defer span.End() + + qp, err := profile.ParseQuery(queryStr) + if err != nil { + return nil, err + } + + profileFilter, profileArgs := ProfileTypeFilter(qp) + labelFilter, labelArgs, err := LabelMatchersToSQL(qp.Matchers) + if err != nil { + return nil, err + } + + sqlQuery := fmt.Sprintf(` + SELECT DISTINCT k FROM ( + SELECT UNNEST(map_keys(labels)) AS k + FROM %s + WHERE %s + AND time_nanos >= ? AND time_nanos <= ?`, + quotedTable(q.client), profileFilter, + ) + + args := append([]interface{}{}, profileArgs...) + args = append(args, start.UnixNano(), end.UnixNano()) + if labelFilter != "" { + sqlQuery += " AND " + labelFilter + args = append(args, labelArgs...) + } + sqlQuery += ") ORDER BY k" + + rows, err := q.client.DB().QueryContext(ctx, sqlQuery, args...) + if err != nil { + return nil, fmt.Errorf("query labels: %w", err) + } + defer rows.Close() + + var out []string + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + return nil, fmt.Errorf("scan label name: %w", err) + } + out = append(out, name) + } + return out, rows.Err() +} + +// runStacktraceQuery executes a query whose rows have the shape +// (stacktrace LIST, value BIGINT, sample_duration BIGINT, +// sample_period BIGINT). It symbolises any unsymbolised locations and +// emits one Arrow record describing the whole result set. +func (q *Querier) runStacktraceQuery( + ctx context.Context, + sqlQuery string, + args []interface{}, + invertCallStacks bool, +) ([]arrow.RecordBatch, error) { + rows, err := q.client.DB().QueryContext(ctx, sqlQuery, args...) + if err != nil { + return nil, fmt.Errorf("execute stacktrace query: %w", err) + } + defer rows.Close() + + type row struct { + stack []stacktraceLoc + value int64 + duration int64 + period int64 + } + + var samples []row + // buildID -> address -> Location, used to deduplicate symbolisation + // requests across all rows in this result. + locationIndex := make(map[string]map[uint64]*profile.Location) + + for rows.Next() { + var stack duckdb.Composite[[]stacktraceLoc] + var r row + if err := rows.Scan(&stack, &r.value, &r.duration, &r.period); err != nil { + return nil, fmt.Errorf("scan stacktrace row: %w", err) + } + r.stack = stack.Get() + + for _, loc := range r.stack { + needsSym := loc.FunctionName == "" && loc.MappingBuildID != "" && loc.Address != 0 + if !needsSym { + continue + } + if _, ok := locationIndex[loc.MappingBuildID]; !ok { + locationIndex[loc.MappingBuildID] = make(map[uint64]*profile.Location) + } + if _, ok := locationIndex[loc.MappingBuildID][loc.Address]; ok { + continue + } + locationIndex[loc.MappingBuildID][loc.Address] = &profile.Location{ + Address: loc.Address, + Mapping: &metapb.Mapping{ + BuildId: loc.MappingBuildID, + File: loc.MappingFile, + Start: loc.MappingStart, + Limit: loc.MappingLimit, + Offset: loc.MappingOffset, + }, + } + } + + samples = append(samples, r) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate stacktrace rows: %w", err) + } + + for buildID, addrMap := range locationIndex { + locs := make([]*profile.Location, 0, len(addrMap)) + for _, l := range addrMap { + locs = append(locs, l) + } + req := symbolizer.SymbolizationRequest{ + BuildID: buildID, + Mappings: []symbolizer.SymbolizationRequestMappingAddrs{ + {Locations: locs}, + }, + } + if err := q.symbolizer.Symbolize(ctx, req); err != nil { + level.Error(q.logger).Log("msg", "failed to symbolize locations", "buildID", buildID, "err", err) + // continue with raw addresses if symbolisation fails + } + } + + if len(samples) == 0 { + return nil, nil + } + + w := profile.NewWriter(q.mem, []string{}) + defer w.Release() + + for _, s := range samples { + w.LocationsList.Append(true) + + n := len(s.stack) + for i := 0; i < n; i++ { + idx := i + if invertCallStacks { + idx = n - 1 - i + } + loc := s.stack[idx] + + w.Locations.Append(true) + w.Addresses.Append(loc.Address) + w.MappingStart.Append(loc.MappingStart) + w.MappingLimit.Append(loc.MappingLimit) + w.MappingOffset.Append(loc.MappingOffset) + + if err := w.MappingFile.Append([]byte(loc.MappingFile)); err != nil { + level.Error(q.logger).Log("msg", "append mapping file", "err", err) + } + if err := w.MappingBuildID.Append([]byte(loc.MappingBuildID)); err != nil { + level.Error(q.logger).Log("msg", "append mapping build id", "err", err) + } + + // Prefer freshly symbolised function info over the row's stored + // data when the symbolizer returned something. + var sym *profile.Location + if m, ok := locationIndex[loc.MappingBuildID]; ok { + sym = m[loc.Address] + } + + switch { + case sym != nil && len(sym.Lines) > 0: + w.Lines.Append(true) + for _, line := range sym.Lines { + w.Line.Append(true) + w.LineNumber.Append(line.Line) + if line.Function != nil { + _ = w.FunctionName.Append([]byte(line.Function.Name)) + _ = w.FunctionSystemName.Append([]byte(line.Function.SystemName)) + _ = w.FunctionFilename.Append([]byte(line.Function.Filename)) + w.FunctionStartLine.Append(line.Function.StartLine) + } else { + w.FunctionName.AppendNull() + w.FunctionSystemName.AppendNull() + w.FunctionFilename.AppendNull() + w.FunctionStartLine.AppendNull() + } + } + case loc.FunctionName != "": + w.Lines.Append(true) + w.Line.Append(true) + w.LineNumber.Append(loc.LineNumber) + _ = w.FunctionName.Append([]byte(loc.FunctionName)) + _ = w.FunctionSystemName.Append([]byte(loc.FunctionSystemName)) + _ = w.FunctionFilename.Append([]byte(loc.FunctionFilename)) + w.FunctionStartLine.Append(loc.FunctionStartLine) + default: + w.Lines.AppendNull() + } + } + + w.Value.Append(s.value) + w.Diff.Append(0) + w.TimeNanos.Append(0) + w.Period.Append(s.period) + } + + return []arrow.RecordBatch{w.RecordBuilder.NewRecordBatch()}, nil +} diff --git a/pkg/duckdb/schema.go b/pkg/duckdb/schema.go new file mode 100644 index 00000000000..75da5a1b241 --- /dev/null +++ b/pkg/duckdb/schema.go @@ -0,0 +1,94 @@ +// Copyright 2026 The Parca Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package duckdb + +import "fmt" + +// Column names matching the parca write schema. +const ( + ColName = "name" + ColSampleType = "sample_type" + ColSampleUnit = "sample_unit" + ColPeriodType = "period_type" + ColPeriodUnit = "period_unit" + ColPeriod = "period" + ColDuration = "duration" + ColTimestamp = "timestamp" + ColTimeNanos = "time_nanos" + ColValue = "value" + ColLabels = "labels" + ColStacktrace = "stacktrace" +) + +// Stacktrace STRUCT field names. +const ( + StFieldAddress = "address" + StFieldMappingStart = "mapping_start" + StFieldMappingLimit = "mapping_limit" + StFieldMappingOffset = "mapping_offset" + StFieldMappingFile = "mapping_file" + StFieldMappingBuildID = "mapping_build_id" + StFieldLineNumber = "line_number" + StFieldFunctionName = "function_name" + StFieldFunctionSystemName = "function_system_name" + StFieldFunctionFilename = "function_filename" + StFieldFunctionStartLine = "function_start_line" +) + +// CreateTableSQL returns the DDL for the profile data table. +// +// Schema choices vs ClickHouse: +// - labels: MAP(VARCHAR, VARCHAR). DuckDB-native, queryable via labels[key] +// and map_keys/map_values. Avoids the JSON extension dependency. +// - stacktrace: LIST(STRUCT(...)). Each location is a struct in a list, +// unlike ClickHouse's parallel-array Nested layout. UNNEST flattens it +// for queries that iterate locations. +// - DuckDB doesn't have ClickHouse's MergeTree partitioning. We rely on +// min/max statistics on time_nanos for time-range pruning instead. +func CreateTableSQL(table string) string { + return fmt.Sprintf(` +CREATE TABLE IF NOT EXISTS %[1]s ( + name VARCHAR NOT NULL, + sample_type VARCHAR NOT NULL, + sample_unit VARCHAR NOT NULL, + period_type VARCHAR NOT NULL, + period_unit VARCHAR NOT NULL, + period BIGINT NOT NULL, + duration BIGINT NOT NULL, + timestamp BIGINT NOT NULL, + time_nanos BIGINT NOT NULL, + value BIGINT NOT NULL, + labels MAP(VARCHAR, VARCHAR), + stacktrace STRUCT( + address UBIGINT, + mapping_start UBIGINT, + mapping_limit UBIGINT, + mapping_offset UBIGINT, + mapping_file VARCHAR, + mapping_build_id VARCHAR, + line_number BIGINT, + function_name VARCHAR, + function_system_name VARCHAR, + function_filename VARCHAR, + function_start_line BIGINT + )[] +); +`, quoteIdent(table)) +} + +// quoteIdent wraps an identifier in double quotes per SQL standard. +// We don't need full escaping because table names come from operator config. +func quoteIdent(s string) string { + return `"` + s + `"` +} diff --git a/pkg/parca/parca.go b/pkg/parca/parca.go index e3096898d8c..f0b71c16dbd 100644 --- a/pkg/parca/parca.go +++ b/pkg/parca/parca.go @@ -63,6 +63,7 @@ import ( "github.com/parca-dev/parca/pkg/clickhouse" "github.com/parca-dev/parca/pkg/config" "github.com/parca-dev/parca/pkg/debuginfo" + "github.com/parca-dev/parca/pkg/duckdb" "github.com/parca-dev/parca/pkg/kv" "github.com/parca-dev/parca/pkg/parcacol" "github.com/parca-dev/parca/pkg/profilestore" @@ -120,7 +121,10 @@ type Flags struct { ExternalLabel map[string]string `kong:"help='Label(s) to attach to all profiles in scraper-only mode.'"` GRPCHeaders map[string]string `kong:"help='Additional gRPC headers to send with each request to the remote store (key=value pairs).'"` + StorageBackend string `enum:"clickhouse,duckdb" default:"clickhouse" help:"Storage backend for profile data."` + ClickHouse FlagsClickHouse `embed:"" prefix:"clickhouse-"` + DuckDB FlagsDuckDB `embed:"" prefix:"duckdb-"` Hidden FlagsHidden `embed:"" prefix:""` } @@ -171,6 +175,12 @@ type FlagsClickHouse struct { Secure bool `kong:"help='Use TLS for ClickHouse connection.',default='false'"` } +// FlagsDuckDB configures the embedded DuckDB storage backend. +type FlagsDuckDB struct { + Path string `kong:"help='Filesystem path for the DuckDB database file. Empty means an in-memory database (volatile).',default=''"` + Table string `kong:"help='DuckDB table name for profile data.',default='stacktraces'"` +} + // FlagsHidden contains hidden flags intended only for debugging or experimental features. type FlagsHidden struct { DebugNormalizeAddresses bool `kong:"help='Normalize sampled addresses.',default='true',hidden=''"` @@ -310,43 +320,85 @@ func Run(ctx context.Context, logger log.Logger, reg *prometheus.Registry, flags return err } - // Initialize the ClickHouse storage backend. - level.Info(logger).Log("msg", "initializing ClickHouse storage backend", "address", flags.ClickHouse.Address) + // Initialize the configured storage backend. + sharedSymbolizer := symbolizer.New( + logger, + debuginfoMetadata, + symbolizer.NewBadgerCache(db), + debuginfo.NewFetcher(debuginfodClients, debuginfoBucket), + flags.Debuginfo.CacheDir, + flags.Symbolizer.ExternalAddr2linePath, + symbolizer.WithDemangleMode(flags.Symbolizer.DemangleMode), + ) - chClient, err := clickhouse.NewClient(ctx, clickhouse.Config{ - Address: flags.ClickHouse.Address, - Database: flags.ClickHouse.Database, - Username: flags.ClickHouse.Username, - Password: flags.ClickHouse.Password, - Table: flags.ClickHouse.Table, - Secure: flags.ClickHouse.Secure, - }) - if err != nil { - level.Error(logger).Log("msg", "failed to connect to ClickHouse", "err", err) - return fmt.Errorf("failed to connect to ClickHouse: %w", err) - } + var ( + profileIngester profilestore.Ingester + querier queryservice.Querier + closeBackend func() error + ) - if err := chClient.EnsureSchema(ctx); err != nil { - level.Error(logger).Log("msg", "failed to ensure ClickHouse schema", "err", err) - return fmt.Errorf("failed to ensure ClickHouse schema: %w", err) - } + switch flags.StorageBackend { + case "duckdb": + level.Info(logger).Log("msg", "initializing DuckDB storage backend", "path", duckdbPathDescription(flags.DuckDB.Path)) - var profileIngester profilestore.Ingester = clickhouse.NewIngester(logger, chClient) - var querier queryservice.Querier = clickhouse.NewQuerier( - chClient, - logger, - tracerProvider.Tracer("clickhouse-querier"), - memory.DefaultAllocator, - symbolizer.New( + ddClient, err := duckdb.NewClient(ctx, duckdb.Config{ + Path: flags.DuckDB.Path, + Table: flags.DuckDB.Table, + }) + if err != nil { + level.Error(logger).Log("msg", "failed to open DuckDB", "err", err) + return fmt.Errorf("failed to open DuckDB: %w", err) + } + if err := ddClient.EnsureSchema(ctx); err != nil { + ddClient.Close() + level.Error(logger).Log("msg", "failed to ensure DuckDB schema", "err", err) + return fmt.Errorf("failed to ensure DuckDB schema: %w", err) + } + + profileIngester = duckdb.NewIngester(logger, ddClient) + querier = duckdb.NewQuerier( + ddClient, logger, - debuginfoMetadata, - symbolizer.NewBadgerCache(db), - debuginfo.NewFetcher(debuginfodClients, debuginfoBucket), - flags.Debuginfo.CacheDir, - flags.Symbolizer.ExternalAddr2linePath, - symbolizer.WithDemangleMode(flags.Symbolizer.DemangleMode), - ), - ) + tracerProvider.Tracer("duckdb-querier"), + memory.DefaultAllocator, + sharedSymbolizer, + ) + closeBackend = ddClient.Close + + case "clickhouse", "": + level.Info(logger).Log("msg", "initializing ClickHouse storage backend", "address", flags.ClickHouse.Address) + + chClient, err := clickhouse.NewClient(ctx, clickhouse.Config{ + Address: flags.ClickHouse.Address, + Database: flags.ClickHouse.Database, + Username: flags.ClickHouse.Username, + Password: flags.ClickHouse.Password, + Table: flags.ClickHouse.Table, + Secure: flags.ClickHouse.Secure, + }) + if err != nil { + level.Error(logger).Log("msg", "failed to connect to ClickHouse", "err", err) + return fmt.Errorf("failed to connect to ClickHouse: %w", err) + } + if err := chClient.EnsureSchema(ctx); err != nil { + chClient.Close() + level.Error(logger).Log("msg", "failed to ensure ClickHouse schema", "err", err) + return fmt.Errorf("failed to ensure ClickHouse schema: %w", err) + } + + profileIngester = clickhouse.NewIngester(logger, chClient) + querier = clickhouse.NewQuerier( + chClient, + logger, + tracerProvider.Tracer("clickhouse-querier"), + memory.DefaultAllocator, + sharedSymbolizer, + ) + closeBackend = chClient.Close + + default: + return fmt.Errorf("unknown storage backend %q", flags.StorageBackend) + } s := profilestore.NewProfileColumnStore( reg, @@ -567,8 +619,10 @@ func Run(ctx context.Context, logger log.Logger, reg *prometheus.Registry, flags } // Close the storage backend after the parcaserver has shutdown to ensure no more writes occur against it. - if err := chClient.Close(); err != nil { - level.Error(logger).Log("msg", "error closing ClickHouse client", "err", err) + if closeBackend != nil { + if err := closeBackend(); err != nil { + level.Error(logger).Log("msg", "error closing storage backend", "err", err) + } } }, ) @@ -842,6 +896,13 @@ func (t *perRequestBearerToken) RequireTransportSecurity() bool { return !t.insecure } +func duckdbPathDescription(path string) string { + if path == "" { + return "in-memory (volatile)" + } + return path +} + func getDiscoveryConfigs(cfgs []*config.ScrapeConfig) map[string]discovery.Configs { c := make(map[string]discovery.Configs) for _, v := range cfgs {