Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions materialize-s3-iceberg/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ func (c *catalog) CreateResource(ctx context.Context, b *pf.MaterializationSpec_
})
}

// Field IDs are assigned 1..N by iceberg-ctl in the order columns appear
// here, which is FieldSelection.AllFields() == Keys ++ Values ++ Document.
// Sort by collection key so writers cluster rows by key within data files.
for i := 1; i <= len(b.FieldSelection.Keys); i++ {
tc.SortFieldIDs = append(tc.SortFieldIDs, i)
}

input, err := json.Marshal(tc)
if err != nil {
return "", nil, err
Expand Down
24 changes: 23 additions & 1 deletion materialize-s3-iceberg/iceberg-ctl/iceberg_ctl/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from pyiceberg.io import PY_IO_IMPL
from pyiceberg.schema import Schema
from pyiceberg.table import TableProperties
from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, SortOrder
from pyiceberg.transforms import IdentityTransform
from pyiceberg.types import (
BinaryType,
BooleanType,
Expand Down Expand Up @@ -308,6 +310,7 @@ class TableCreate(BaseModel, extra="forbid"):
location: str
fields: list[IcebergColumn]
properties: dict[str, str] | None = None
sort_field_ids: list[int] | None = None


@run.command()
Expand Down Expand Up @@ -343,7 +346,26 @@ def create_table(
if k == TableProperties.DEFAULT_NAME_MAPPING:
continue
properties[k] = v
catalog.create_table(table, schema, table_create.location, properties=properties)

sort_order = SortOrder()
if table_create.sort_field_ids:
sort_order = SortOrder(*[
SortField(
source_id=field_id,
transform=IdentityTransform(),
direction=SortDirection.ASC,
null_order=NullOrder.NULLS_FIRST,
)
for field_id in table_create.sort_field_ids
])

catalog.create_table(
table,
schema,
table_create.location,
sort_order=sort_order,
properties=properties,
)
log(f"create_table: created table {table}")


Expand Down
7 changes: 4 additions & 3 deletions materialize-s3-iceberg/icebergctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ type existingIcebergColumn struct {
}

type tableCreate struct {
Fields []existingIcebergColumn `json:"fields"`
Location string `json:"location"`
Properties map[string]string `json:"properties,omitempty"`
Fields []existingIcebergColumn `json:"fields"`
Location string `json:"location"`
Properties map[string]string `json:"properties,omitempty"`
SortFieldIDs []int `json:"sort_field_ids,omitempty"`
}

type tableAlter struct {
Expand Down
Loading