diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 01c2bc9ebc9..fa053bbb403 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -104,7 +104,7 @@ func TestPprofToArrow(t *testing.T) { }}, } - r, err := normalizer.WriteRawRequestToArrowRecord(ctx, mem, req, schema) + r, err := normalizer.WriteRawRequestToArrowRecord(ctx, mem, req) require.NoError(t, err) defer r.Release() ingester := NewIngester(logger, table) @@ -164,7 +164,7 @@ func TestUncompressedPprofToArrow(t *testing.T) { }}, } - rec, err := normalizer.WriteRawRequestToArrowRecord(ctx, mem, req, schema) + rec, err := normalizer.WriteRawRequestToArrowRecord(ctx, mem, req) require.NoError(t, err) defer rec.Release() ingester := NewIngester(logger, table) @@ -183,9 +183,6 @@ func TestUncompressedPprofToArrow(t *testing.T) { func BenchmarkNormalizeWriteRawRequest(b *testing.B) { ctx := context.Background() - schema, err := profile.Schema() - require.NoError(b, err) - fileContent, err := os.ReadFile("../query/testdata/alloc_objects.pb.gz") require.NoError(b, err) @@ -214,7 +211,7 @@ func BenchmarkNormalizeWriteRawRequest(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - r, err := normalizer.WriteRawRequestToArrowRecord(ctx, mem, req, schema) + r, err := normalizer.WriteRawRequestToArrowRecord(ctx, mem, req) if err != nil { b.Fatal(err) } diff --git a/pkg/normalizer/arrow.go b/pkg/normalizer/arrow.go index 57420e4c716..ecc1032a2f9 100644 --- a/pkg/normalizer/arrow.go +++ b/pkg/normalizer/arrow.go @@ -23,10 +23,7 @@ import ( "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" - "github.com/apache/arrow-go/v18/arrow/compute" "github.com/apache/arrow-go/v18/arrow/memory" - "github.com/polarsignals/frostdb/dynparquet" - "github.com/polarsignals/frostdb/pqarrow/arrowutils" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -58,22 +55,19 @@ type Metrics struct { type arrowToInternalConverter struct { metrics *Metrics - mem memory.Allocator - schema *dynparquet.Schema + mem memory.Allocator b *InternalRecordBuilderV1 } func NewArrowToInternalConverter( mem memory.Allocator, - schema *dynparquet.Schema, metrics *Metrics, ) *arrowToInternalConverter { return &arrowToInternalConverter{ metrics: metrics, - mem: mem, - schema: schema, + mem: mem, b: &InternalRecordBuilderV1{}, } @@ -379,7 +373,7 @@ func (r *InternalRecordBuilderV1) validate() error { } func (c *arrowToInternalConverter) NewRecord(ctx context.Context) (arrow.RecordBatch, error) { - newRecord := array.NewRecordBatch( + return array.NewRecordBatch( arrow.NewSchema(append(c.b.LabelFields, []arrow.Field{{ Name: profile.ColumnName, Type: &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint32, ValueType: arrow.BinaryTypes.Binary}, @@ -429,64 +423,7 @@ func (c *arrowToInternalConverter) NewRecord(ctx context.Context) (arrow.RecordB c.b.Value, }...), int64(c.b.Value.Len()), - ) - - sortingColDefs := c.schema.ColumnDefinitionsForSortingColumns() - sortingColumns := make([]arrowutils.SortingColumn, 0, len(sortingColDefs)) - arrowSchema := newRecord.Schema() - arrowFields := arrowSchema.Fields() - for _, col := range c.schema.SortingColumns() { - direction := arrowutils.Ascending - if col.Descending() { - direction = arrowutils.Descending - } - - colDef, found := c.schema.ColumnByName(col.ColumnName()) - if !found { - return nil, fmt.Errorf("sorting column %v not found in schema", col.ColumnName()) - } - - if colDef.Dynamic { - for i, c := range arrowFields { - if strings.HasPrefix(c.Name, colDef.Name) { - sortingColumns = append(sortingColumns, arrowutils.SortingColumn{ - Index: i, - Direction: direction, - NullsFirst: col.NullsFirst(), - }) - } - } - } else { - indices := arrowSchema.FieldIndices(colDef.Name) - for _, i := range indices { - sortingColumns = append(sortingColumns, arrowutils.SortingColumn{ - Index: i, - Direction: direction, - NullsFirst: col.NullsFirst(), - }) - } - } - } - - sortedIdxs, err := arrowutils.SortRecord(newRecord, sortingColumns) - if err != nil { - return nil, fmt.Errorf("failed to sort record: %w", err) - } - isSorted := true - for i := 0; i < sortedIdxs.Len(); i++ { - if sortedIdxs.Value(i) != int32(i) { - isSorted = false - break - } - } - - if isSorted { - return newRecord, nil - } - - // Release the record, since Take will allocate a new, sorted, record. - defer newRecord.Release() - return arrowutils.Take(compute.WithAllocator(ctx, c.mem), newRecord, sortedIdxs) + ), nil } func (c *arrowToInternalConverter) AddLocationsRecordV1( diff --git a/pkg/normalizer/arrow_v2_test.go b/pkg/normalizer/arrow_v2_test.go index d3f9cd4cd9e..11e5fe8101e 100644 --- a/pkg/normalizer/arrow_v2_test.go +++ b/pkg/normalizer/arrow_v2_test.go @@ -37,10 +37,7 @@ func TestAddSampleRecordV2(t *testing.T) { rec := buildV2SampleRecord(t, mem) defer rec.Release() - dynSchema, err := profile.Schema() - require.NoError(t, err) - - c := NewArrowToInternalConverter(mem, dynSchema, NewMetrics(prometheus.NewRegistry())) + c := NewArrowToInternalConverter(mem, NewMetrics(prometheus.NewRegistry())) defer c.Release() require.NoError(t, c.AddSampleRecord(context.Background(), rec)) diff --git a/pkg/normalizer/normalizer.go b/pkg/normalizer/normalizer.go index 70b17c0dd70..8a6cab6f9ce 100644 --- a/pkg/normalizer/normalizer.go +++ b/pkg/normalizer/normalizer.go @@ -22,19 +22,12 @@ import ( "io" "slices" "sort" - "strings" "time" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" - "github.com/apache/arrow-go/v18/arrow/compute" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/gogo/status" - "github.com/parquet-go/parquet-go" - "github.com/polarsignals/frostdb/dynparquet" - "github.com/polarsignals/frostdb/pqarrow" - "github.com/polarsignals/frostdb/pqarrow/arrowutils" - "github.com/polarsignals/frostdb/query/logicalplan" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/util/strutil" pprofextended "go.opentelemetry.io/proto/otlp/profiles/v1development" @@ -119,7 +112,6 @@ func WriteRawRequestToArrowRecord( ctx context.Context, mem memory.Allocator, req *profilestorepb.WriteRawRequest, - schema *dynparquet.Schema, ) (arrow.RecordBatch, error) { normalizedRequest, err := NormalizeWriteRawRequest( ctx, @@ -129,18 +121,7 @@ func WriteRawRequestToArrowRecord( return nil, err } - ps, err := schema.GetDynamicParquetSchema(map[string][]string{ - profile.ColumnLabels: normalizedRequest.AllLabelNames, - }) - if err != nil { - return nil, err - } - defer schema.PutPooledParquetSchema(ps) - - arrowSchema, err := pqarrow.ParquetSchemaToArrowSchema(ctx, ps.Schema, schema, logicalplan.IterOptions{}) - if err != nil { - return nil, err - } + arrowSchema := profile.BuildArrowSchema(normalizedRequest.AllLabelNames) b := array.NewRecordBuilder(mem, arrowSchema) numRows := 0 @@ -154,7 +135,7 @@ func WriteRawRequestToArrowRecord( b.Reserve(numRows) defer b.Release() - for _, col := range schema.Columns() { + for _, col := range profile.SchemaDefinition().Columns { switch col.Name { case profile.ColumnDuration: cBuilder := b.Field(b.Schema().FieldIndices(col.Name)[0]).(*array.Int64Builder) @@ -332,61 +313,7 @@ func WriteRawRequestToArrowRecord( return nil, nil } - sortingColDefs := schema.ColumnDefinitionsForSortingColumns() - sortingColumns := make([]arrowutils.SortingColumn, 0, len(sortingColDefs)) - arrowFields := arrowSchema.Fields() - for _, col := range schema.SortingColumns() { - direction := arrowutils.Ascending - if col.Descending() { - direction = arrowutils.Descending - } - - colDef, found := schema.ColumnByName(col.ColumnName()) - if !found { - return nil, fmt.Errorf("sorting column %v not found in schema", col.ColumnName()) - } - - if colDef.Dynamic { - for i, c := range arrowFields { - if strings.HasPrefix(c.Name, colDef.Name) { - sortingColumns = append(sortingColumns, arrowutils.SortingColumn{ - Index: i, - Direction: direction, - NullsFirst: col.NullsFirst(), - }) - } - } - } else { - indices := arrowSchema.FieldIndices(colDef.Name) - for _, i := range indices { - sortingColumns = append(sortingColumns, arrowutils.SortingColumn{ - Index: i, - Direction: direction, - NullsFirst: col.NullsFirst(), - }) - } - } - } - - sortedIdxs, err := arrowutils.SortRecord(record, sortingColumns) - if err != nil { - return nil, fmt.Errorf("failed to sort record: %w", err) - } - isSorted := true - for i := 0; i < sortedIdxs.Len(); i++ { - if sortedIdxs.Value(i) != int32(i) { - isSorted = false - break - } - } - - if isSorted { - return record, nil - } - - // Release the record, since Take will allocate a new, sorted, record. - defer record.Release() - return arrowutils.Take(compute.WithAllocator(ctx, mem), record, sortedIdxs) + return record, nil } func NormalizePprof( @@ -623,80 +550,3 @@ func LabelNamesFromSamples( } } -// SampleToParquetRow converts a sample to a Parquet row. The passed labels -// must be sorted. -func SampleToParquetRow( - schema *dynparquet.Schema, - row parquet.Row, - labelNames, profileLabelNames, profileNumLabelNames []string, - lset map[string]string, - meta profile.Meta, - s *NormalizedSample, -) parquet.Row { - // schema.Columns() returns a sorted list of all columns. - // We match on the column's name to insert the correct values. - // We track the columnIndex to insert each column at the correct index. - columnIndex := 0 - for _, column := range schema.Columns() { - switch column.Name { - case profile.ColumnDuration: - row = append(row, parquet.ValueOf(meta.Duration).Level(0, 0, columnIndex)) - columnIndex++ - case profile.ColumnName: - row = append(row, parquet.ValueOf(meta.Name).Level(0, 0, columnIndex)) - columnIndex++ - case profile.ColumnPeriod: - row = append(row, parquet.ValueOf(meta.Period).Level(0, 0, columnIndex)) - columnIndex++ - case profile.ColumnPeriodType: - row = append(row, parquet.ValueOf(meta.PeriodType.Type).Level(0, 0, columnIndex)) - columnIndex++ - case profile.ColumnPeriodUnit: - row = append(row, parquet.ValueOf(meta.PeriodType.Unit).Level(0, 0, columnIndex)) - columnIndex++ - case profile.ColumnSampleType: - row = append(row, parquet.ValueOf(meta.SampleType.Type).Level(0, 0, columnIndex)) - columnIndex++ - case profile.ColumnSampleUnit: - row = append(row, parquet.ValueOf(meta.SampleType.Unit).Level(0, 0, columnIndex)) - columnIndex++ - case profile.ColumnStacktrace: - if len(s.Locations) == 0 { - row = append(row, parquet.ValueOf(nil).Level(0, 0, columnIndex)) - } - for i, s := range s.Locations { - switch i { - case 0: - row = append(row, parquet.ValueOf(s).Level(0, 1, columnIndex)) - default: - row = append(row, parquet.ValueOf(s).Level(1, 1, columnIndex)) - } - } - columnIndex++ - case profile.ColumnTimestamp: - row = append(row, parquet.ValueOf(meta.Timestamp).Level(0, 0, columnIndex)) - columnIndex++ - case profile.ColumnTimeNanos: - row = append(row, parquet.ValueOf(meta.TimeNanos).Level(0, 0, columnIndex)) - columnIndex++ - case profile.ColumnValue: - row = append(row, parquet.ValueOf(s.Value).Level(0, 0, columnIndex)) - columnIndex++ - - // All remaining cases take care of dynamic columns - case profile.ColumnLabels: - for _, name := range labelNames { - if value, ok := lset[name]; ok { - row = append(row, parquet.ValueOf(value).Level(0, 1, columnIndex)) - } else { - row = append(row, parquet.ValueOf(nil).Level(0, 0, columnIndex)) - } - columnIndex++ - } - default: - panic(fmt.Errorf("conversion not implement for column: %s", column.Name)) - } - } - - return row -} diff --git a/pkg/normalizer/otel.go b/pkg/normalizer/otel.go index 66d4be652de..3d9026abbc8 100644 --- a/pkg/normalizer/otel.go +++ b/pkg/normalizer/otel.go @@ -21,11 +21,8 @@ import ( "time" "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/memory" - "github.com/parquet-go/parquet-go" - "github.com/polarsignals/frostdb/dynparquet" - "github.com/polarsignals/frostdb/pqarrow" - "github.com/polarsignals/frostdb/query/logicalplan" "github.com/prometheus/prometheus/util/strutil" otelgrpcprofilingpb "go.opentelemetry.io/proto/otlp/collector/profiles/v1development" v1 "go.opentelemetry.io/proto/otlp/common/v1" @@ -38,27 +35,25 @@ import ( func OtlpRequestToArrowRecord( ctx context.Context, req *otelgrpcprofilingpb.ExportProfilesServiceRequest, - schema *dynparquet.Schema, mem memory.Allocator, ) (arrow.RecordBatch, error) { if err := ValidateOtelExportProfilesServiceRequest(req); err != nil { return nil, fmt.Errorf("invalid request: %w", err) } - w, err := newProfileWriter( - mem, - schema, - getAllLabelNames(req), - ) - if err != nil { - return nil, err - } + w := newProfileWriter(mem, getAllLabelNames(req)) + defer w.Release() if err := w.writeResourceProfiles(req); err != nil { return nil, err } - return w.ArrowRecord(ctx) + rec := w.NewRecordBatch() + if rec.NumRows() == 0 { + rec.Release() + return nil, nil + } + return rec, nil } type labelNames struct { @@ -154,42 +149,127 @@ func getAllLabelNames(req *otelgrpcprofilingpb.ExportProfilesServiceRequest) []s } type profileWriter struct { - mem memory.Allocator - labelNames []string - schema *dynparquet.Schema - buffer *dynparquet.Buffer - row parquet.Row + rb *array.RecordBuilder + + duration *array.Int64Builder + name *array.BinaryDictionaryBuilder + period *array.Int64Builder + periodType *array.BinaryDictionaryBuilder + periodUnit *array.BinaryDictionaryBuilder + sampleType *array.BinaryDictionaryBuilder + sampleUnit *array.BinaryDictionaryBuilder + stacktrace *array.ListBuilder + stacktraceVal *array.BinaryDictionaryBuilder + timestamp *array.Int64Builder + timeNanos *array.Int64Builder + value *array.Int64Builder + + // labelBuilders are aligned with labelNames. + labelBuilders []*array.BinaryDictionaryBuilder } -func newProfileWriter( - mem memory.Allocator, - schema *dynparquet.Schema, - labelNames []string, -) (*profileWriter, error) { - // Create a buffer with all possible labels, pprof labels and pprof num labels as dynamic columns. - // We use NewBuffer instead of GetBuffer here since analysis showed a very - // low hit rate, meaning buffers were GCed faster than they could be reused. - // The downside of using a pool is that buffers are held around for longer. - // Using NewBuffer means that we pay the price of reallocating a buffer, - // but they get GCed a lot sooner. - buffer, err := schema.NewBuffer(map[string][]string{ - profile.ColumnLabels: labelNames, - }) - if err != nil { - return nil, err +func newProfileWriter(mem memory.Allocator, labelNames []string) *profileWriter { + schema := profile.BuildArrowSchema(labelNames) + rb := array.NewRecordBuilder(mem, schema) + + w := &profileWriter{ + labelNames: labelNames, + rb: rb, + labelBuilders: make([]*array.BinaryDictionaryBuilder, len(labelNames)), + } + + for i, field := range schema.Fields() { + switch field.Name { + case profile.ColumnDuration: + w.duration = rb.Field(i).(*array.Int64Builder) + case profile.ColumnName: + w.name = rb.Field(i).(*array.BinaryDictionaryBuilder) + case profile.ColumnPeriod: + w.period = rb.Field(i).(*array.Int64Builder) + case profile.ColumnPeriodType: + w.periodType = rb.Field(i).(*array.BinaryDictionaryBuilder) + case profile.ColumnPeriodUnit: + w.periodUnit = rb.Field(i).(*array.BinaryDictionaryBuilder) + case profile.ColumnSampleType: + w.sampleType = rb.Field(i).(*array.BinaryDictionaryBuilder) + case profile.ColumnSampleUnit: + w.sampleUnit = rb.Field(i).(*array.BinaryDictionaryBuilder) + case profile.ColumnStacktrace: + w.stacktrace = rb.Field(i).(*array.ListBuilder) + w.stacktraceVal = w.stacktrace.ValueBuilder().(*array.BinaryDictionaryBuilder) + case profile.ColumnTimestamp: + w.timestamp = rb.Field(i).(*array.Int64Builder) + case profile.ColumnTimeNanos: + w.timeNanos = rb.Field(i).(*array.Int64Builder) + case profile.ColumnValue: + w.value = rb.Field(i).(*array.Int64Builder) + default: + if strings.HasPrefix(field.Name, profile.ColumnLabelsPrefix) { + ln := strings.TrimPrefix(field.Name, profile.ColumnLabelsPrefix) + for j, name := range labelNames { + if name == ln { + w.labelBuilders[j] = rb.Field(i).(*array.BinaryDictionaryBuilder) + break + } + } + } + } } - return &profileWriter{ - mem: mem, + return w +} - labelNames: labelNames, - schema: schema, - buffer: buffer, +func (w *profileWriter) Release() { + w.rb.Release() +} - row: make(parquet.Row, 0, len(schema.ParquetSchema().Fields())), - }, nil +func (w *profileWriter) NewRecordBatch() arrow.RecordBatch { + return w.rb.NewRecordBatch() +} + +func (w *profileWriter) appendSample(meta profile.Meta, value int64, locations [][]byte, ls map[string]string) error { + w.duration.Append(meta.Duration) + if err := w.name.AppendString(meta.Name); err != nil { + return err + } + w.period.Append(meta.Period) + if err := w.periodType.AppendString(meta.PeriodType.Type); err != nil { + return err + } + if err := w.periodUnit.AppendString(meta.PeriodType.Unit); err != nil { + return err + } + if err := w.sampleType.AppendString(meta.SampleType.Type); err != nil { + return err + } + if err := w.sampleUnit.AppendString(meta.SampleType.Unit); err != nil { + return err + } + w.stacktrace.Append(len(locations) != 0) + for _, loc := range locations { + if len(loc) == 0 { + w.stacktraceVal.AppendNull() + continue + } + if err := w.stacktraceVal.Append(loc); err != nil { + return err + } + } + w.timestamp.Append(meta.Timestamp) + w.timeNanos.Append(meta.TimeNanos) + w.value.Append(value) + for i, name := range w.labelNames { + if val, ok := ls[name]; ok { + if err := w.labelBuilders[i].AppendString(val); err != nil { + return err + } + continue + } + w.labelBuilders[i].AppendNull() + } + return nil } func (w *profileWriter) writeResourceProfiles( @@ -212,6 +292,17 @@ func (w *profileWriter) writeResourceProfiles( ls.addOtelAttributes(sp.Scope.Attributes) ls.addOtelAttributes(rp.Resource.Attributes) + locations := serializeOtelStacktrace( + p, + sample, + req.Dictionary.FunctionTable, + req.Dictionary.MappingTable, + req.Dictionary.LocationTable, + req.Dictionary.AttributeTable, + req.Dictionary.StackTable, + req.Dictionary.StringTable, + ) + // see https://github.com/open-telemetry/opentelemetry-proto/blob/30fc16100aa513254a71ef83ae2de321fb1bdfeb/opentelemetry/proto/profiles/v1development/profiles.proto#L345 // for more information on the TimestampsUnixNano and Values relationship. if len(sample.TimestampsUnixNano) > 0 { @@ -220,36 +311,17 @@ func (w *profileWriter) writeResourceProfiles( if len(sample.Values) > 0 { value = sample.Values[i] } - row := SampleToParquetRow( - w.schema, - w.row[:0], - w.labelNames, nil, nil, - ls.labels, - profile.Meta{ - Name: metas[0].Name, - PeriodType: metas[0].PeriodType, - SampleType: metas[0].SampleType, - Timestamp: int64(ts) / time.Millisecond.Nanoseconds(), - TimeNanos: int64(ts), - Duration: metas[0].Duration, - Period: metas[0].Period, - }, - &NormalizedSample{ - Locations: serializeOtelStacktrace( - p, - sample, - req.Dictionary.FunctionTable, - req.Dictionary.MappingTable, - req.Dictionary.LocationTable, - req.Dictionary.AttributeTable, - req.Dictionary.StackTable, - req.Dictionary.StringTable, - ), - Value: value, - }, - ) - if _, err := w.buffer.WriteRows([]parquet.Row{row}); err != nil { - return fmt.Errorf("failed to write row to buffer: %w", err) + meta := profile.Meta{ + Name: metas[0].Name, + PeriodType: metas[0].PeriodType, + SampleType: metas[0].SampleType, + Timestamp: int64(ts) / time.Millisecond.Nanoseconds(), + TimeNanos: int64(ts), + Duration: metas[0].Duration, + Period: metas[0].Period, + } + if err := w.appendSample(meta, value, locations, ls.labels); err != nil { + return err } } } else { @@ -257,29 +329,8 @@ func (w *profileWriter) writeResourceProfiles( if value == 0 { continue } - - row := SampleToParquetRow( - w.schema, - w.row[:0], - w.labelNames, nil, nil, - ls.labels, - metas[j], - &NormalizedSample{ - Locations: serializeOtelStacktrace( - p, - sample, - req.Dictionary.FunctionTable, - req.Dictionary.MappingTable, - req.Dictionary.LocationTable, - req.Dictionary.AttributeTable, - req.Dictionary.StackTable, - req.Dictionary.StringTable, - ), - Value: value, - }, - ) - if _, err := w.buffer.WriteRows([]parquet.Row{row}); err != nil { - return fmt.Errorf("failed to write row to buffer: %w", err) + if err := w.appendSample(metas[j], value, locations, ls.labels); err != nil { + return err } } } @@ -291,27 +342,6 @@ func (w *profileWriter) writeResourceProfiles( return nil } -func (w *profileWriter) ArrowRecord(ctx context.Context) (arrow.RecordBatch, error) { - if w.buffer.NumRows() == 0 { - // If there are no rows in the buffer we simply return early - return nil, nil - } - - // We need to sort the buffer so the rows are inserted in sorted order later - // on the storage nodes. - w.buffer.Sort() - - // Convert the sorted buffer to an arrow record. - converter := pqarrow.NewParquetConverter(w.mem, logicalplan.IterOptions{}) - defer converter.Close() - - if err := converter.Convert(ctx, w.buffer, w.schema); err != nil { - return nil, fmt.Errorf("failed to convert parquet to arrow: %w", err) - } - - return converter.NewRecord(), nil -} - func ValidateOtelExportProfilesServiceRequest(req *otelgrpcprofilingpb.ExportProfilesServiceRequest) error { if req == nil { return fmt.Errorf("request is nil") diff --git a/pkg/parca/parca.go b/pkg/parca/parca.go index 6b6dbde6cca..c6313b27f81 100644 --- a/pkg/parca/parca.go +++ b/pkg/parca/parca.go @@ -35,7 +35,6 @@ import ( grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/oklog/run" - "github.com/polarsignals/frostdb/dynparquet" "github.com/prometheus/client_golang/prometheus" promconfig "github.com/prometheus/common/config" "github.com/prometheus/prometheus/discovery" @@ -67,7 +66,6 @@ import ( "github.com/parca-dev/parca/pkg/ingester" "github.com/parca-dev/parca/pkg/kv" "github.com/parca-dev/parca/pkg/parcacol" - "github.com/parca-dev/parca/pkg/profile" "github.com/parca-dev/parca/pkg/profilestore" queryservice "github.com/parca-dev/parca/pkg/query" "github.com/parca-dev/parca/pkg/scrape" @@ -351,18 +349,11 @@ func Run(ctx context.Context, logger log.Logger, reg *prometheus.Registry, flags ), ) - schema, err := dynparquet.SchemaFromDefinition(profile.SchemaDefinition()) - if err != nil { - level.Error(logger).Log("msg", "schema from definition", "err", err) - return err - } - s := profilestore.NewProfileColumnStore( reg, logger, tracerProvider.Tracer("profilestore"), profileIngester, - schema, memory.DefaultAllocator, ) diff --git a/pkg/parca/parca_test.go b/pkg/parca/parca_test.go index 9ee4ff5cf34..54a3658581e 100644 --- a/pkg/parca/parca_test.go +++ b/pkg/parca/parca_test.go @@ -183,9 +183,6 @@ func TestConsistency(t *testing.T) { colDB, err := col.DB(context.Background(), "parca") require.NoError(t, err) - schema, err := parcaprofile.Schema() - require.NoError(t, err) - table, err := colDB.Table( "stacktraces", frostdb.NewTableConfig(parcaprofile.SchemaDefinition()), @@ -211,7 +208,6 @@ func TestConsistency(t *testing.T) { logger, tracer, ingester, - schema, memory.DefaultAllocator, ) @@ -301,9 +297,6 @@ func TestPGOE2e(t *testing.T) { colDB, err := col.DB(context.Background(), "parca") require.NoError(t, err) - schema, err := parcaprofile.Schema() - require.NoError(t, err) - table, err := colDB.Table( "stacktraces", frostdb.NewTableConfig(parcaprofile.SchemaDefinition()), @@ -322,7 +315,6 @@ func TestPGOE2e(t *testing.T) { logger, tracer, ingester, - schema, memory.DefaultAllocator, ) @@ -402,9 +394,6 @@ func TestLabels(t *testing.T) { colDB, err := col.DB(context.Background(), "parca") require.NoError(t, err) - schema, err := parcaprofile.Schema() - require.NoError(t, err) - table, err := colDB.Table( "labels", frostdb.NewTableConfig(parcaprofile.SchemaDefinition()), @@ -423,7 +412,6 @@ func TestLabels(t *testing.T) { logger, tracer, ingester, - schema, memory.DefaultAllocator, ) _, err = store.WriteRaw(ctx, &profilestorepb.WriteRawRequest{ diff --git a/pkg/profile/schema.go b/pkg/profile/schema.go index 401017bdcd1..dee62a95837 100644 --- a/pkg/profile/schema.go +++ b/pkg/profile/schema.go @@ -14,6 +14,9 @@ package profile import ( + "fmt" + + "github.com/apache/arrow-go/v18/arrow" "github.com/polarsignals/frostdb/dynparquet" schemapb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/schema/v1alpha1" ) @@ -162,3 +165,54 @@ func SchemaDefinition() *schemapb.Schema { func Schema() (*dynparquet.Schema, error) { return dynparquet.SchemaFromDefinition(SchemaDefinition()) } + +// BuildArrowSchema returns the Arrow schema for the parca write/ingest +// profile data, expanding the dynamic ColumnLabels column into one +// "labels." field per labelName. The column order matches the proto +// definition order, with dynamic labels emitted in place of the labels +// column. Static columns map to: +// - TYPE_INT64 → arrow.PrimitiveTypes.Int64 +// - TYPE_STRING (non-repeated) → dictionary-encoded binary +// - TYPE_STRING (repeated) → list of dictionary-encoded binary +func BuildArrowSchema(labelNames []string) *arrow.Schema { + def := SchemaDefinition() + fields := make([]arrow.Field, 0, len(def.Columns)+len(labelNames)) + for _, col := range def.Columns { + if col.Dynamic && col.Name == ColumnLabels { + for _, name := range labelNames { + fields = append(fields, arrow.Field{ + Name: ColumnLabelsPrefix + name, + Type: dictBinary(), + Nullable: true, + }) + } + continue + } + fields = append(fields, columnToArrowField(col)) + } + return arrow.NewSchema(fields, nil) +} + +func columnToArrowField(col *schemapb.Column) arrow.Field { + layout := col.StorageLayout + field := arrow.Field{Name: col.Name, Nullable: layout.Nullable} + switch layout.Type { + case schemapb.StorageLayout_TYPE_INT64: + field.Type = arrow.PrimitiveTypes.Int64 + case schemapb.StorageLayout_TYPE_STRING: + field.Type = dictBinary() + default: + panic(fmt.Sprintf("profile: unsupported column %q storage type %v", col.Name, layout.Type)) + } + if layout.Repeated { + field.Type = arrow.ListOf(field.Type) + } + return field +} + +func dictBinary() arrow.DataType { + return &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Uint32, + ValueType: arrow.BinaryTypes.Binary, + } +} diff --git a/pkg/profilestore/profilecolumnstore.go b/pkg/profilestore/profilecolumnstore.go index bf9a71271b3..0cdc1854a0b 100644 --- a/pkg/profilestore/profilecolumnstore.go +++ b/pkg/profilestore/profilecolumnstore.go @@ -28,7 +28,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gogo/status" - "github.com/polarsignals/frostdb/dynparquet" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/promql/parser" "go.opentelemetry.io/otel/trace" @@ -66,8 +65,7 @@ type ProfileColumnStore struct { // ip as the key agents map[string]agent - mem memory.Allocator - schema *dynparquet.Schema + mem memory.Allocator converterMetrics *normalizer.Metrics } @@ -79,7 +77,6 @@ func NewProfileColumnStore( logger log.Logger, tracer trace.Tracer, ingester ingester.Ingester, - schema *dynparquet.Schema, mem memory.Allocator, ) *ProfileColumnStore { normalizerMetrics := normalizer.NewMetrics(reg) @@ -87,7 +84,6 @@ func NewProfileColumnStore( logger: logger, tracer: tracer, ingester: ingester, - schema: schema, mem: mem, agents: make(map[string]agent), @@ -100,7 +96,6 @@ func (s *ProfileColumnStore) writeSeries(ctx context.Context, req *profilestorep ctx, s.mem, req, - s.schema, ) if err != nil { return err @@ -278,7 +273,6 @@ func (s *ProfileColumnStore) WriteArrow(ctx context.Context, req *profilestorepb c := normalizer.NewArrowToInternalConverter( s.mem, - s.schema, s.converterMetrics, ) defer c.Release() @@ -388,7 +382,6 @@ func (s *ProfileColumnStore) write(ctx context.Context, server profilestorepb.Pr c := normalizer.NewArrowToInternalConverter( s.mem, - s.schema, s.converterMetrics, ) defer c.Release() @@ -473,7 +466,6 @@ func (s *ProfileColumnStore) Export(ctx context.Context, req *otelgrpcprofilingp r, err := normalizer.OtlpRequestToArrowRecord( ctx, req, - s.schema, s.mem, ) if err != nil { diff --git a/pkg/profilestore/profilestore_test.go b/pkg/profilestore/profilestore_test.go index 731e9a5ecd9..da4eb96900e 100644 --- a/pkg/profilestore/profilestore_test.go +++ b/pkg/profilestore/profilestore_test.go @@ -44,9 +44,6 @@ func Test_LabelName_Error(t *testing.T) { colDB, err := col.DB(context.Background(), "parca") require.NoError(t, err) - schema, err := profile.Schema() - require.NoError(t, err) - table, err := colDB.Table( "stacktraces", frostdb.NewTableConfig(profile.SchemaDefinition()), @@ -63,7 +60,6 @@ func Test_LabelName_Error(t *testing.T) { logger, tracer, ingester, - schema, memory.DefaultAllocator, ) @@ -123,9 +119,6 @@ func BenchmarkProfileColumnStoreWriteSeries(b *testing.B) { colDB, err := col.DB(ctx, "parca") require.NoError(b, err) - schema, err := profile.Schema() - require.NoError(b, err) - table, err := colDB.Table( "stacktraces", frostdb.NewTableConfig(profile.SchemaDefinition()), @@ -142,7 +135,6 @@ func BenchmarkProfileColumnStoreWriteSeries(b *testing.B) { logger, tracer, ingester, - schema, memory.DefaultAllocator, ) diff --git a/pkg/query/columnquery_test.go b/pkg/query/columnquery_test.go index 799f840d11b..43be1463626 100644 --- a/pkg/query/columnquery_test.go +++ b/pkg/query/columnquery_test.go @@ -148,8 +148,6 @@ func TestColumnQueryAPIQueryRange(t *testing.T) { colDB, err := col.DB(context.Background(), "parca") require.NoError(t, err) - schema, err := profile.Schema() - require.NoError(t, err) table, err := colDB.Table( "stacktraces", @@ -170,7 +168,6 @@ func TestColumnQueryAPIQueryRange(t *testing.T) { logger, tracer, ingester, - schema, memory.DefaultAllocator, ) @@ -245,8 +242,6 @@ func TestColumnQueryAPIQuerySingle(t *testing.T) { colDB, err := col.DB(context.Background(), "parca") require.NoError(t, err) - schema, err := profile.Schema() - require.NoError(t, err) table, err := colDB.Table( "stacktraces", @@ -262,7 +257,6 @@ func TestColumnQueryAPIQuerySingle(t *testing.T) { logger, tracer, ingester, - schema, memory.DefaultAllocator, ) @@ -395,8 +389,6 @@ func TestColumnQueryAPIQueryFgprof(t *testing.T) { colDB, err := col.DB(context.Background(), "parca") require.NoError(t, err) - schema, err := profile.Schema() - require.NoError(t, err) table, err := colDB.Table( "stacktraces", @@ -417,7 +409,6 @@ func TestColumnQueryAPIQueryFgprof(t *testing.T) { logger, tracer, ingester, - schema, memory.DefaultAllocator, ) @@ -489,8 +480,6 @@ func TestColumnQueryAPIQueryCumulative(t *testing.T) { colDB, err := col.DB(context.Background(), "parca") require.NoError(t, err) - schema, err := profile.Schema() - require.NoError(t, err) table, err := colDB.Table( "stacktraces", @@ -508,7 +497,6 @@ func TestColumnQueryAPIQueryCumulative(t *testing.T) { logger, tracer, ingester, - schema, memory.DefaultAllocator, ) @@ -647,8 +635,6 @@ func TestColumnQueryAPIQueryDiff(t *testing.T) { colDB, err := col.DB(context.Background(), "parca") require.NoError(t, err) - schema, err := profile.Schema() - require.NoError(t, err) table, err := colDB.Table( "stacktraces", @@ -724,7 +710,7 @@ func TestColumnQueryAPIQueryDiff(t *testing.T) { RawProfile: MustCompressGzip(t, p), }}, }}, - }, schema) + }) require.NoError(t, err) require.NoError(t, ingester.Ingest(ctx, r1)) r1.Release() @@ -752,7 +738,7 @@ func TestColumnQueryAPIQueryDiff(t *testing.T) { RawProfile: MustCompressGzip(t, p), }}, }}, - }, schema) + }) require.NoError(t, err) require.NoError(t, ingester.Ingest(ctx, r2)) r2.Release() @@ -933,8 +919,6 @@ func TestColumnQueryAPITypes(t *testing.T) { colDB, err := col.DB(context.Background(), "parca") require.NoError(t, err) - schema, err := profile.Schema() - require.NoError(t, err) table, err := colDB.Table( "stacktraces", @@ -954,7 +938,6 @@ func TestColumnQueryAPITypes(t *testing.T) { logger, tracer, ingester, - schema, memory.DefaultAllocator, ) @@ -1031,8 +1014,6 @@ func TestColumnQueryAPILabelNames(t *testing.T) { colDB, err := col.DB(context.Background(), "parca") require.NoError(t, err) - schema, err := profile.Schema() - require.NoError(t, err) table, err := colDB.Table( "stacktraces", @@ -1052,7 +1033,6 @@ func TestColumnQueryAPILabelNames(t *testing.T) { logger, tracer, ingester, - schema, memory.DefaultAllocator, ) @@ -1119,8 +1099,6 @@ func TestColumnQueryAPILabelValues(t *testing.T) { colDB, err := col.DB(context.Background(), "parca") require.NoError(t, err) - schema, err := profile.Schema() - require.NoError(t, err) table, err := colDB.Table( "stacktraces", @@ -1140,7 +1118,6 @@ func TestColumnQueryAPILabelValues(t *testing.T) { logger, tracer, ingester, - schema, memory.DefaultAllocator, ) diff --git a/pkg/query/query_test.go b/pkg/query/query_test.go index b64f8c0b134..3db7f3fda3f 100644 --- a/pkg/query/query_test.go +++ b/pkg/query/query_test.go @@ -52,9 +52,6 @@ func Benchmark_Query_Merge(b *testing.B) { colDB, err := col.DB(context.Background(), "parca") require.NoError(b, err) - schema, err := profile.Schema() - require.NoError(b, err) - table, err := colDB.Table( "stacktraces", columnstore.NewTableConfig(profile.SchemaDefinition()), @@ -74,7 +71,6 @@ func Benchmark_Query_Merge(b *testing.B) { logger, tracer, ingester, - schema, memory.DefaultAllocator, )