From 1f52bb84a5dec1702661b8f7fef09537829f7592 Mon Sep 17 00:00:00 2001 From: Jacob Marble Date: Thu, 7 May 2026 01:14:51 +0000 Subject: [PATCH] materialize-s3-iceberg: sort tables by collection key on create --- materialize-s3-iceberg/catalog.go | 7 ++++++ .../iceberg-ctl/iceberg_ctl/__main__.py | 24 ++++++++++++++++++- materialize-s3-iceberg/icebergctl.go | 7 +++--- 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/materialize-s3-iceberg/catalog.go b/materialize-s3-iceberg/catalog.go index 809752cd8e..0b244b29a0 100644 --- a/materialize-s3-iceberg/catalog.go +++ b/materialize-s3-iceberg/catalog.go @@ -128,6 +128,13 @@ func (c *catalog) CreateResource(ctx context.Context, b *pf.MaterializationSpec_ }) } + // Field IDs are assigned 1..N by iceberg-ctl in the order columns appear + // here, which is FieldSelection.AllFields() == Keys ++ Values ++ Document. + // Sort by collection key so writers cluster rows by key within data files. + for i := 1; i <= len(b.FieldSelection.Keys); i++ { + tc.SortFieldIDs = append(tc.SortFieldIDs, i) + } + input, err := json.Marshal(tc) if err != nil { return "", nil, err diff --git a/materialize-s3-iceberg/iceberg-ctl/iceberg_ctl/__main__.py b/materialize-s3-iceberg/iceberg-ctl/iceberg_ctl/__main__.py index 19ffd1243d..9ed427bf8f 100644 --- a/materialize-s3-iceberg/iceberg-ctl/iceberg_ctl/__main__.py +++ b/materialize-s3-iceberg/iceberg-ctl/iceberg_ctl/__main__.py @@ -16,6 +16,8 @@ from pyiceberg.io import PY_IO_IMPL from pyiceberg.schema import Schema from pyiceberg.table import TableProperties +from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, SortOrder +from pyiceberg.transforms import IdentityTransform from pyiceberg.types import ( BinaryType, BooleanType, @@ -308,6 +310,7 @@ class TableCreate(BaseModel, extra="forbid"): location: str fields: list[IcebergColumn] properties: dict[str, str] | None = None + sort_field_ids: list[int] | None = None @run.command() @@ -343,7 +346,26 @@ def create_table( if k == TableProperties.DEFAULT_NAME_MAPPING: continue properties[k] = v - catalog.create_table(table, schema, table_create.location, properties=properties) + + sort_order = SortOrder() + if table_create.sort_field_ids: + sort_order = SortOrder(*[ + SortField( + source_id=field_id, + transform=IdentityTransform(), + direction=SortDirection.ASC, + null_order=NullOrder.NULLS_FIRST, + ) + for field_id in table_create.sort_field_ids + ]) + + catalog.create_table( + table, + schema, + table_create.location, + sort_order=sort_order, + properties=properties, + ) log(f"create_table: created table {table}") diff --git a/materialize-s3-iceberg/icebergctl.go b/materialize-s3-iceberg/icebergctl.go index 9f62ca19b8..9570c66625 100644 --- a/materialize-s3-iceberg/icebergctl.go +++ b/materialize-s3-iceberg/icebergctl.go @@ -30,9 +30,10 @@ type existingIcebergColumn struct { } type tableCreate struct { - Fields []existingIcebergColumn `json:"fields"` - Location string `json:"location"` - Properties map[string]string `json:"properties,omitempty"` + Fields []existingIcebergColumn `json:"fields"` + Location string `json:"location"` + Properties map[string]string `json:"properties,omitempty"` + SortFieldIDs []int `json:"sort_field_ids,omitempty"` } type tableAlter struct {