Skip to content

Commit 52b2070

Browse files
committed
feat: add dictionary_columns to scan API for memory-efficient string reads
1 parent 1a54e9c commit 52b2070

4 files changed

Lines changed: 325 additions & 53 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1614,8 +1614,13 @@ def _task_to_record_batches(
16141614
partition_spec: PartitionSpec | None = None,
16151615
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
16161616
downcast_ns_timestamp_to_us: bool | None = None,
1617+
dictionary_columns: tuple[str, ...] | None = None,
16171618
) -> Iterator[pa.RecordBatch]:
1618-
arrow_format = _get_file_format(task.file.file_format, pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
1619+
# Only pass dictionary_columns for Parquet — ORC does not support this kwarg.
1620+
format_kwargs: dict[str, Any] = {"pre_buffer": True, "buffer_size": ONE_MEGABYTE * 8}
1621+
if dictionary_columns and task.file.file_format == FileFormat.PARQUET:
1622+
format_kwargs["dictionary_columns"] = dictionary_columns
1623+
arrow_format = _get_file_format(task.file.file_format, **format_kwargs)
16191624
with io.new_input(task.file.file_path).open() as fin:
16201625
fragment = arrow_format.make_fragment(fin)
16211626
physical_schema = fragment.physical_schema
@@ -1718,6 +1723,7 @@ class ArrowScan:
17181723
_case_sensitive: bool
17191724
_limit: int | None
17201725
_downcast_ns_timestamp_to_us: bool | None
1726+
_dictionary_columns: tuple[str, ...] | None
17211727
"""Scan the Iceberg Table and create an Arrow construct.
17221728
17231729
Attributes:
@@ -1737,6 +1743,8 @@ def __init__(
17371743
row_filter: BooleanExpression,
17381744
case_sensitive: bool = True,
17391745
limit: int | None = None,
1746+
*,
1747+
dictionary_columns: tuple[str, ...] | None = None,
17401748
) -> None:
17411749
self._table_metadata = table_metadata
17421750
self._io = io
@@ -1745,6 +1753,7 @@ def __init__(
17451753
self._case_sensitive = case_sensitive
17461754
self._limit = limit
17471755
self._downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE)
1756+
self._dictionary_columns = dictionary_columns
17481757

17491758
@property
17501759
def _projected_field_ids(self) -> set[int]:
@@ -1773,6 +1782,15 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
17731782
ValueError: When a field type in the file cannot be projected to the schema type
17741783
"""
17751784
arrow_schema = schema_to_pyarrow(self._projected_schema, include_field_ids=False)
1785+
if self._dictionary_columns:
1786+
dict_cols_set = set(self._dictionary_columns)
1787+
arrow_schema = pa.schema(
1788+
[
1789+
field.with_type(pa.dictionary(pa.int32(), field.type)) if field.name in dict_cols_set else field
1790+
for field in arrow_schema
1791+
],
1792+
metadata=arrow_schema.metadata,
1793+
)
17761794

17771795
batches = self.to_record_batches(tasks)
17781796
try:
@@ -1855,6 +1873,7 @@ def _record_batches_from_scan_tasks_and_deletes(
18551873
self._table_metadata.specs().get(task.file.spec_id),
18561874
self._table_metadata.format_version,
18571875
self._downcast_ns_timestamp_to_us,
1876+
dictionary_columns=self._dictionary_columns,
18581877
)
18591878
for batch in batches:
18601879
if self._limit is not None:

pyiceberg/table/__init__.py

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1121,6 +1121,7 @@ def scan(
11211121
snapshot_id: int | None = None,
11221122
options: Properties = EMPTY_DICT,
11231123
limit: int | None = None,
1124+
dictionary_columns: tuple[str, ...] | None = None,
11241125
) -> DataScan:
11251126
"""Fetch a DataScan based on the table's current metadata.
11261127
@@ -1147,6 +1148,13 @@ def scan(
11471148
An integer representing the number of rows to
11481149
return in the scan result. If None, fetches all
11491150
matching rows.
1151+
dictionary_columns:
1152+
A tuple of column names that PyArrow should read as
1153+
dictionary-encoded (DictionaryArray). Reduces memory
1154+
usage for columns with large or repeated string values
1155+
(e.g. large JSON blobs). Only applies to Parquet files;
1156+
silently ignored for ORC. Columns absent from the file
1157+
are silently skipped. Default is None (no dictionary encoding).
11501158
11511159
Returns:
11521160
A DataScan based on the table's current metadata.
@@ -1162,6 +1170,7 @@ def scan(
11621170
limit=limit,
11631171
catalog=self.catalog,
11641172
table_identifier=self._identifier,
1173+
dictionary_columns=dictionary_columns,
11651174
)
11661175

11671176
@property
@@ -1664,6 +1673,7 @@ def scan(
16641673
snapshot_id: int | None = None,
16651674
options: Properties = EMPTY_DICT,
16661675
limit: int | None = None,
1676+
dictionary_columns: tuple[str, ...] | None = None,
16671677
) -> DataScan:
16681678
raise ValueError("Cannot scan a staged table")
16691679

@@ -1749,16 +1759,20 @@ def projection(self) -> Schema:
17491759
return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive)
17501760

17511761
@abstractmethod
1752-
def plan_files(self) -> Iterable[ScanTask]: ...
1762+
def plan_files(self) -> Iterable[ScanTask]:
1763+
...
17531764

17541765
@abstractmethod
1755-
def to_arrow(self) -> pa.Table: ...
1766+
def to_arrow(self) -> pa.Table:
1767+
...
17561768

17571769
@abstractmethod
1758-
def to_pandas(self, **kwargs: Any) -> pd.DataFrame: ...
1770+
def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
1771+
...
17591772

17601773
@abstractmethod
1761-
def to_polars(self) -> pl.DataFrame: ...
1774+
def to_polars(self) -> pl.DataFrame:
1775+
...
17621776

17631777
def update(self: S, **overrides: Any) -> S:
17641778
"""Create a copy of this table scan with updated fields."""
@@ -1791,7 +1805,8 @@ def with_case_sensitive(self: S, case_sensitive: bool = True) -> S:
17911805
return self.update(case_sensitive=case_sensitive)
17921806

17931807
@abstractmethod
1794-
def count(self) -> int: ...
1808+
def count(self) -> int:
1809+
...
17951810

17961811

17971812
class ScanTask:
@@ -1916,6 +1931,36 @@ def _min_sequence_number(manifests: list[ManifestFile]) -> int:
19161931

19171932

19181933
class DataScan(TableScan):
1934+
dictionary_columns: tuple[str, ...] | None
1935+
1936+
def __init__(
1937+
self,
1938+
table_metadata: TableMetadata,
1939+
io: FileIO,
1940+
row_filter: str | BooleanExpression = ALWAYS_TRUE,
1941+
selected_fields: tuple[str, ...] = ("*",),
1942+
case_sensitive: bool = True,
1943+
snapshot_id: int | None = None,
1944+
options: Properties = EMPTY_DICT,
1945+
limit: int | None = None,
1946+
catalog: Catalog | None = None,
1947+
table_identifier: Identifier | None = None,
1948+
dictionary_columns: tuple[str, ...] | None = None,
1949+
) -> None:
1950+
super().__init__(
1951+
table_metadata=table_metadata,
1952+
io=io,
1953+
row_filter=row_filter,
1954+
selected_fields=selected_fields,
1955+
case_sensitive=case_sensitive,
1956+
snapshot_id=snapshot_id,
1957+
options=options,
1958+
limit=limit,
1959+
catalog=catalog,
1960+
table_identifier=table_identifier,
1961+
)
1962+
self.dictionary_columns = dictionary_columns
1963+
19191964
def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
19201965
project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive)
19211966
return project(self.row_filter)
@@ -2113,7 +2158,13 @@ def to_arrow(self) -> pa.Table:
21132158
from pyiceberg.io.pyarrow import ArrowScan
21142159

21152160
return ArrowScan(
2116-
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
2161+
self.table_metadata,
2162+
self.io,
2163+
self.projection(),
2164+
self.row_filter,
2165+
self.case_sensitive,
2166+
self.limit,
2167+
dictionary_columns=self.dictionary_columns,
21172168
).to_table(self.plan_files())
21182169

21192170
def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
@@ -2132,8 +2183,29 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
21322183
from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
21332184

21342185
target_schema = schema_to_pyarrow(self.projection())
2186+
2187+
# When dictionary_columns is set, PyArrow returns DictionaryArray for those columns.
2188+
# target_schema uses plain string types, so .cast(target_schema) would silently decode
2189+
# them back to plain strings. Rebuild target_schema with dictionary types for the listed
2190+
# columns so from_batches and cast both preserve the encoding.
2191+
if self.dictionary_columns:
2192+
dict_cols_set = set(self.dictionary_columns)
2193+
target_schema = pa.schema(
2194+
[
2195+
field.with_type(pa.dictionary(pa.int32(), field.type)) if field.name in dict_cols_set else field
2196+
for field in target_schema
2197+
],
2198+
metadata=target_schema.metadata,
2199+
)
2200+
21352201
batches = ArrowScan(
2136-
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
2202+
self.table_metadata,
2203+
self.io,
2204+
self.projection(),
2205+
self.row_filter,
2206+
self.case_sensitive,
2207+
self.limit,
2208+
dictionary_columns=self.dictionary_columns,
21372209
).to_record_batches(self.plan_files())
21382210

21392211
return pa.RecordBatchReader.from_batches(

0 commit comments

Comments
 (0)