Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestPprofToArrow(t *testing.T) {
}},
}

r, err := normalizer.WriteRawRequestToArrowRecord(ctx, mem, req, schema)
r, err := normalizer.WriteRawRequestToArrowRecord(ctx, mem, req)
require.NoError(t, err)
defer r.Release()
ingester := NewIngester(logger, table)
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestUncompressedPprofToArrow(t *testing.T) {
}},
}

rec, err := normalizer.WriteRawRequestToArrowRecord(ctx, mem, req, schema)
rec, err := normalizer.WriteRawRequestToArrowRecord(ctx, mem, req)
require.NoError(t, err)
defer rec.Release()
ingester := NewIngester(logger, table)
Expand All @@ -183,9 +183,6 @@ func TestUncompressedPprofToArrow(t *testing.T) {
func BenchmarkNormalizeWriteRawRequest(b *testing.B) {
ctx := context.Background()

schema, err := profile.Schema()
require.NoError(b, err)

fileContent, err := os.ReadFile("../query/testdata/alloc_objects.pb.gz")
require.NoError(b, err)

Expand Down Expand Up @@ -214,7 +211,7 @@ func BenchmarkNormalizeWriteRawRequest(b *testing.B) {

b.ResetTimer()
for i := 0; i < b.N; i++ {
r, err := normalizer.WriteRawRequestToArrowRecord(ctx, mem, req, schema)
r, err := normalizer.WriteRawRequestToArrowRecord(ctx, mem, req)
if err != nil {
b.Fatal(err)
}
Expand Down
71 changes: 4 additions & 67 deletions pkg/normalizer/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ import (

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/compute"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/polarsignals/frostdb/dynparquet"
"github.com/polarsignals/frostdb/pqarrow/arrowutils"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

Expand Down Expand Up @@ -58,22 +55,19 @@ type Metrics struct {
type arrowToInternalConverter struct {
metrics *Metrics

mem memory.Allocator
schema *dynparquet.Schema
mem memory.Allocator

b *InternalRecordBuilderV1
}

func NewArrowToInternalConverter(
mem memory.Allocator,
schema *dynparquet.Schema,
metrics *Metrics,
) *arrowToInternalConverter {
return &arrowToInternalConverter{
metrics: metrics,

mem: mem,
schema: schema,
mem: mem,

b: &InternalRecordBuilderV1{},
}
Expand Down Expand Up @@ -379,7 +373,7 @@ func (r *InternalRecordBuilderV1) validate() error {
}

func (c *arrowToInternalConverter) NewRecord(ctx context.Context) (arrow.RecordBatch, error) {
newRecord := array.NewRecordBatch(
return array.NewRecordBatch(
arrow.NewSchema(append(c.b.LabelFields, []arrow.Field{{
Name: profile.ColumnName,
Type: &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint32, ValueType: arrow.BinaryTypes.Binary},
Expand Down Expand Up @@ -429,64 +423,7 @@ func (c *arrowToInternalConverter) NewRecord(ctx context.Context) (arrow.RecordB
c.b.Value,
}...),
int64(c.b.Value.Len()),
)

sortingColDefs := c.schema.ColumnDefinitionsForSortingColumns()
sortingColumns := make([]arrowutils.SortingColumn, 0, len(sortingColDefs))
arrowSchema := newRecord.Schema()
arrowFields := arrowSchema.Fields()
for _, col := range c.schema.SortingColumns() {
direction := arrowutils.Ascending
if col.Descending() {
direction = arrowutils.Descending
}

colDef, found := c.schema.ColumnByName(col.ColumnName())
if !found {
return nil, fmt.Errorf("sorting column %v not found in schema", col.ColumnName())
}

if colDef.Dynamic {
for i, c := range arrowFields {
if strings.HasPrefix(c.Name, colDef.Name) {
sortingColumns = append(sortingColumns, arrowutils.SortingColumn{
Index: i,
Direction: direction,
NullsFirst: col.NullsFirst(),
})
}
}
} else {
indices := arrowSchema.FieldIndices(colDef.Name)
for _, i := range indices {
sortingColumns = append(sortingColumns, arrowutils.SortingColumn{
Index: i,
Direction: direction,
NullsFirst: col.NullsFirst(),
})
}
}
}

sortedIdxs, err := arrowutils.SortRecord(newRecord, sortingColumns)
if err != nil {
return nil, fmt.Errorf("failed to sort record: %w", err)
}
isSorted := true
for i := 0; i < sortedIdxs.Len(); i++ {
if sortedIdxs.Value(i) != int32(i) {
isSorted = false
break
}
}

if isSorted {
return newRecord, nil
}

// Release the record, since Take will allocate a new, sorted, record.
defer newRecord.Release()
return arrowutils.Take(compute.WithAllocator(ctx, c.mem), newRecord, sortedIdxs)
), nil
}

func (c *arrowToInternalConverter) AddLocationsRecordV1(
Expand Down
5 changes: 1 addition & 4 deletions pkg/normalizer/arrow_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ func TestAddSampleRecordV2(t *testing.T) {
rec := buildV2SampleRecord(t, mem)
defer rec.Release()

dynSchema, err := profile.Schema()
require.NoError(t, err)

c := NewArrowToInternalConverter(mem, dynSchema, NewMetrics(prometheus.NewRegistry()))
c := NewArrowToInternalConverter(mem, NewMetrics(prometheus.NewRegistry()))
defer c.Release()

require.NoError(t, c.AddSampleRecord(context.Background(), rec))
Expand Down
156 changes: 3 additions & 153 deletions pkg/normalizer/normalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,12 @@ import (
"io"
"slices"
"sort"
"strings"
"time"

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/compute"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/gogo/status"
"github.com/parquet-go/parquet-go"
"github.com/polarsignals/frostdb/dynparquet"
"github.com/polarsignals/frostdb/pqarrow"
"github.com/polarsignals/frostdb/pqarrow/arrowutils"
"github.com/polarsignals/frostdb/query/logicalplan"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/util/strutil"
pprofextended "go.opentelemetry.io/proto/otlp/profiles/v1development"
Expand Down Expand Up @@ -119,7 +112,6 @@ func WriteRawRequestToArrowRecord(
ctx context.Context,
mem memory.Allocator,
req *profilestorepb.WriteRawRequest,
schema *dynparquet.Schema,
) (arrow.RecordBatch, error) {
normalizedRequest, err := NormalizeWriteRawRequest(
ctx,
Expand All @@ -129,18 +121,7 @@ func WriteRawRequestToArrowRecord(
return nil, err
}

ps, err := schema.GetDynamicParquetSchema(map[string][]string{
profile.ColumnLabels: normalizedRequest.AllLabelNames,
})
if err != nil {
return nil, err
}
defer schema.PutPooledParquetSchema(ps)

arrowSchema, err := pqarrow.ParquetSchemaToArrowSchema(ctx, ps.Schema, schema, logicalplan.IterOptions{})
if err != nil {
return nil, err
}
arrowSchema := profile.BuildArrowSchema(normalizedRequest.AllLabelNames)

b := array.NewRecordBuilder(mem, arrowSchema)
numRows := 0
Expand All @@ -154,7 +135,7 @@ func WriteRawRequestToArrowRecord(
b.Reserve(numRows)
defer b.Release()

for _, col := range schema.Columns() {
for _, col := range profile.SchemaDefinition().Columns {
switch col.Name {
case profile.ColumnDuration:
cBuilder := b.Field(b.Schema().FieldIndices(col.Name)[0]).(*array.Int64Builder)
Expand Down Expand Up @@ -332,61 +313,7 @@ func WriteRawRequestToArrowRecord(
return nil, nil
}

sortingColDefs := schema.ColumnDefinitionsForSortingColumns()
sortingColumns := make([]arrowutils.SortingColumn, 0, len(sortingColDefs))
arrowFields := arrowSchema.Fields()
for _, col := range schema.SortingColumns() {
direction := arrowutils.Ascending
if col.Descending() {
direction = arrowutils.Descending
}

colDef, found := schema.ColumnByName(col.ColumnName())
if !found {
return nil, fmt.Errorf("sorting column %v not found in schema", col.ColumnName())
}

if colDef.Dynamic {
for i, c := range arrowFields {
if strings.HasPrefix(c.Name, colDef.Name) {
sortingColumns = append(sortingColumns, arrowutils.SortingColumn{
Index: i,
Direction: direction,
NullsFirst: col.NullsFirst(),
})
}
}
} else {
indices := arrowSchema.FieldIndices(colDef.Name)
for _, i := range indices {
sortingColumns = append(sortingColumns, arrowutils.SortingColumn{
Index: i,
Direction: direction,
NullsFirst: col.NullsFirst(),
})
}
}
}

sortedIdxs, err := arrowutils.SortRecord(record, sortingColumns)
if err != nil {
return nil, fmt.Errorf("failed to sort record: %w", err)
}
isSorted := true
for i := 0; i < sortedIdxs.Len(); i++ {
if sortedIdxs.Value(i) != int32(i) {
isSorted = false
break
}
}

if isSorted {
return record, nil
}

// Release the record, since Take will allocate a new, sorted, record.
defer record.Release()
return arrowutils.Take(compute.WithAllocator(ctx, mem), record, sortedIdxs)
return record, nil
}

func NormalizePprof(
Expand Down Expand Up @@ -623,80 +550,3 @@ func LabelNamesFromSamples(
}
}

// SampleToParquetRow converts a sample to a Parquet row. The passed labels
// must be sorted.
func SampleToParquetRow(
schema *dynparquet.Schema,
row parquet.Row,
labelNames, profileLabelNames, profileNumLabelNames []string,
lset map[string]string,
meta profile.Meta,
s *NormalizedSample,
) parquet.Row {
// schema.Columns() returns a sorted list of all columns.
// We match on the column's name to insert the correct values.
// We track the columnIndex to insert each column at the correct index.
columnIndex := 0
for _, column := range schema.Columns() {
switch column.Name {
case profile.ColumnDuration:
row = append(row, parquet.ValueOf(meta.Duration).Level(0, 0, columnIndex))
columnIndex++
case profile.ColumnName:
row = append(row, parquet.ValueOf(meta.Name).Level(0, 0, columnIndex))
columnIndex++
case profile.ColumnPeriod:
row = append(row, parquet.ValueOf(meta.Period).Level(0, 0, columnIndex))
columnIndex++
case profile.ColumnPeriodType:
row = append(row, parquet.ValueOf(meta.PeriodType.Type).Level(0, 0, columnIndex))
columnIndex++
case profile.ColumnPeriodUnit:
row = append(row, parquet.ValueOf(meta.PeriodType.Unit).Level(0, 0, columnIndex))
columnIndex++
case profile.ColumnSampleType:
row = append(row, parquet.ValueOf(meta.SampleType.Type).Level(0, 0, columnIndex))
columnIndex++
case profile.ColumnSampleUnit:
row = append(row, parquet.ValueOf(meta.SampleType.Unit).Level(0, 0, columnIndex))
columnIndex++
case profile.ColumnStacktrace:
if len(s.Locations) == 0 {
row = append(row, parquet.ValueOf(nil).Level(0, 0, columnIndex))
}
for i, s := range s.Locations {
switch i {
case 0:
row = append(row, parquet.ValueOf(s).Level(0, 1, columnIndex))
default:
row = append(row, parquet.ValueOf(s).Level(1, 1, columnIndex))
}
}
columnIndex++
case profile.ColumnTimestamp:
row = append(row, parquet.ValueOf(meta.Timestamp).Level(0, 0, columnIndex))
columnIndex++
case profile.ColumnTimeNanos:
row = append(row, parquet.ValueOf(meta.TimeNanos).Level(0, 0, columnIndex))
columnIndex++
case profile.ColumnValue:
row = append(row, parquet.ValueOf(s.Value).Level(0, 0, columnIndex))
columnIndex++

// All remaining cases take care of dynamic columns
case profile.ColumnLabels:
for _, name := range labelNames {
if value, ok := lset[name]; ok {
row = append(row, parquet.ValueOf(value).Level(0, 1, columnIndex))
} else {
row = append(row, parquet.ValueOf(nil).Level(0, 0, columnIndex))
}
columnIndex++
}
default:
panic(fmt.Errorf("conversion not implement for column: %s", column.Name))
}
}

return row
}
Loading
Loading