Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/snowflake/snowpark/_internal/proto/ast.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2267,6 +2267,7 @@ message ReadTable {
google.protobuf.StringValue time_travel_mode = 7;
Expr timestamp = 8;
google.protobuf.StringValue timestamp_type = 9;
google.protobuf.StringValue iceberg_tag = 10;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the client change generated from the monorepo AST definition?

I remember a client AST change isn't strictly required right now; if it's needed, it should be derived from the monorepo updates -- there is a step by step doc for the AST mono repo updates.

@sfc-gh-heshah what's your recommendation here? can we just not add the ast in the PR and do that later?

}

// dataframe-io.ir:54
Expand Down Expand Up @@ -2469,6 +2470,7 @@ message Table {
Expr timestamp = 8;
google.protobuf.StringValue timestamp_type = 9;
TableVariant variant = 10;
google.protobuf.StringValue iceberg_tag = 11;
}

// table.ir:1
Expand Down
19 changes: 16 additions & 3 deletions src/snowflake/snowpark/_internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1955,6 +1955,7 @@ class TimeTravelConfig(NamedTuple):
timestamp: Optional[str] = None
timestamp_type: Optional[str] = None
stream: Optional[str] = None
iceberg_tag: Optional[str] = None

@staticmethod
def validate_and_normalize_params(
Expand All @@ -1964,6 +1965,7 @@ def validate_and_normalize_params(
timestamp: Optional[Union[str, datetime.datetime]] = None,
timestamp_type: Optional[Union[str, "TimestampTimeZone"]] = None,
stream: Optional[str] = None,
iceberg_tag: Optional[str] = None,
) -> Optional["TimeTravelConfig"]:
"""
Validates and normalizes time travel parameters.
Expand All @@ -1986,7 +1988,8 @@ def validate_and_normalize_params(
ValueError: If parameters are invalid.
"""
time_travel_arg_count = sum(
arg is not None for arg in (statement, offset, timestamp, stream)
arg is not None
for arg in (statement, offset, timestamp, stream, iceberg_tag)
)

# Validate mode
Expand All @@ -2003,10 +2006,16 @@ def validate_and_normalize_params(
f"Invalid time travel mode: {time_travel_mode}. Must be 'at' or 'before'."
)

# Validate iceberg_tag can only be used with 'at' mode
if iceberg_tag is not None and time_travel_mode.lower() != "at":
raise ValueError(
"Iceberg tag time travel can only be used with time_travel_mode='at', not 'before'."
)

# Validate exactly one parameter is provided
if time_travel_arg_count != 1:
raise ValueError(
"Exactly one of 'statement', 'offset', 'timestamp', or 'stream' must be provided."
"Exactly one of 'statement', 'offset', 'timestamp', 'stream', or 'iceberg_tag' must be provided."
)

# Normalize timestamp
Expand Down Expand Up @@ -2040,6 +2049,7 @@ def validate_and_normalize_params(
timestamp=normalized_timestamp,
timestamp_type=timestamp_type,
stream=stream,
iceberg_tag=iceberg_tag,
)

def generate_sql_clause(self) -> str:
Expand All @@ -2048,7 +2058,8 @@ def generate_sql_clause(self) -> str:
Args:
config: Time travel configuration.
Returns:
SQL clause like " AT (TIMESTAMP => TO_TIMESTAMP_NTZ('...'))"
SQL clause like " AT (TIMESTAMP => TO_TIMESTAMP_NTZ('...'))" or
" AT (ICEBERG_TAG => 'tag_name')" for Iceberg tables.
"""
clause = f" {self.time_travel_mode.upper()} "

Expand All @@ -2058,6 +2069,8 @@ def generate_sql_clause(self) -> str:
clause += f"(OFFSET => {self.offset})"
elif self.stream is not None:
clause += f"(STREAM => '{self.stream}')"
elif self.iceberg_tag is not None:
clause += f"(ICEBERG_TAG => '{self.iceberg_tag}')"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unable to see this feature in public doc.
what release status of this iceberg tag feature?

elif self.timestamp is not None:
if self.timestamp_type is not None:
timestamp_type = self.timestamp_type.upper()
Expand Down
47 changes: 44 additions & 3 deletions src/snowflake/snowpark/dataframe_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@
"TIMESTAMP": "timestamp",
"TIMESTAMP_TYPE": "timestamp_type",
"STREAM": "stream",
"ICEBERG_TAG": "iceberg_tag",
"TAG": "iceberg_tag",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using tag as an alias for iceberg_tag, is that mapping defined within the Snowflake Iceberg spec or it's a spark spec.

snowflake has its own "tag" concept, will there be future conflict under the context of dataframe reader?

}

READER_OPTIONS_ALIAS_MAP = {
Expand Down Expand Up @@ -162,6 +164,10 @@ def _extract_time_travel_from_options(options: dict) -> dict:
- Automatically sets time_travel_mode to 'at'
- Cannot be used with time_travel_mode='before' (raises error)
- Cannot be mixed with regular 'timestamp' option (raises error)

Special handling for 'TAG' or 'ICEBERG_TAG' (Spark Iceberg compatibility):
- Automatically sets time_travel_mode to 'at' (Iceberg tags only work with AT)
- Cannot be used with time_travel_mode='before' (raises error)
"""
result = {}
excluded_keys = set()
Expand All @@ -183,6 +189,22 @@ def _extract_time_travel_from_options(options: dict) -> dict:
result["timestamp"] = options["AS-OF-TIMESTAMP"]
excluded_keys.add("TIMESTAMP")

# Handle Iceberg tag option (Spark Iceberg compatibility)
tag_value = options.get("TAG") or options.get("ICEBERG_TAG")
if tag_value is not None:
if (
"TIME_TRAVEL_MODE" in options
and options["TIME_TRAVEL_MODE"].lower() == "before"
):
raise ValueError(
"Cannot use 'tag' option with time_travel_mode='before'. "
"Iceberg tags only work with time_travel_mode='at'."
)
result["time_travel_mode"] = "at"
result["iceberg_tag"] = tag_value
excluded_keys.add("ICEBERG_TAG")
excluded_keys.add("TAG")

for option_key, param_name in _TIME_TRAVEL_OPTIONS_PARAMS_MAP.items():
if option_key in options and option_key not in excluded_keys:
result[param_name] = options[option_key]
Expand Down Expand Up @@ -549,6 +571,7 @@ def table(
timestamp: Optional[Union[str, datetime]] = None,
timestamp_type: Optional[Union[str, TimestampTimeZone]] = None,
stream: Optional[str] = None,
iceberg_tag: Optional[str] = None,
) -> Table:
"""Returns a Table that points to the specified table.

Expand All @@ -568,6 +591,9 @@ def table(
timestamp_type: Type of timestamp interpretation ('NTZ', 'LTZ', or 'TZ').
Can also be set via ``option("timestamp_type", "LTZ")``.
stream: Stream name for time travel. Can also be set via ``option("stream", "stream_name")``.
iceberg_tag: Iceberg snapshot tag name for time travel on Iceberg tables.
Can also be set via ``option("tag", "tag_name")`` or
``option("iceberg_tag", "tag_name")``. Automatically sets time_travel_mode='at'.

Note:
Time travel options can be set either as direct parameters or via the
Expand All @@ -577,6 +603,9 @@ def table(
PySpark Compatibility: The ``as-of-timestamp`` option automatically sets
``time_travel_mode="at"`` and cannot be used with ``time_travel_mode="before"``.

Spark Iceberg Compatibility: The ``tag`` option automatically sets
``time_travel_mode="at"`` for Iceberg tag-based time travel.

Examples::

# Using direct parameters
Expand All @@ -591,6 +620,9 @@ def table(
# PySpark-style as-of-timestamp (automatically sets mode to "at")
>>> table = session.read.option("as-of-timestamp", "2023-01-01 12:00:00").table("my_table") # doctest: +SKIP

# Iceberg tag-based time travel (automatically sets mode to "at")
>>> table = session.read.option("tag", "release_v1").table("my_iceberg_table") # doctest: +SKIP

# timestamp_type automatically set to "TZ" due to timezone info
>>> import datetime, pytz # doctest: +SKIP
>>> tz_aware = datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=pytz.UTC) # doctest: +SKIP
Expand Down Expand Up @@ -625,15 +657,24 @@ def table(
ast.timestamp_type.value = str(timestamp_type)
if stream is not None:
ast.stream.value = stream

if time_travel_mode is not None:
if iceberg_tag is not None and hasattr(ast, "iceberg_tag"):
ast.iceberg_tag.value = iceberg_tag

if time_travel_mode is not None or iceberg_tag is not None:
# If iceberg_tag is provided but time_travel_mode is not, default to 'at'
effective_mode = (
time_travel_mode
if time_travel_mode
else ("at" if iceberg_tag else None)
)
time_travel_params = {
"time_travel_mode": time_travel_mode,
"time_travel_mode": effective_mode,
"statement": statement,
"offset": offset,
"timestamp": timestamp,
"timestamp_type": timestamp_type,
"stream": stream,
"iceberg_tag": iceberg_tag,
}
else:
# if time_travel_mode is not provided, extract time travel config from options
Expand Down
10 changes: 9 additions & 1 deletion src/snowflake/snowpark/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -2727,6 +2727,7 @@ def table(
timestamp: Optional[Union[str, datetime.datetime]] = None,
timestamp_type: Optional[Union[str, TimestampTimeZone]] = None,
stream: Optional[str] = None,
iceberg_tag: Optional[str] = None,
) -> Table:
"""
Returns a Table that points the specified table.
Expand All @@ -2738,12 +2739,15 @@ def table(
_emit_ast: Whether to emit AST statements.

time_travel_mode: Time travel mode, either 'at' or 'before'.
Exactly one of statement, offset, timestamp, or stream must be provided when time_travel_mode is set.
Exactly one of statement, offset, timestamp, stream, or iceberg_tag must be provided when time_travel_mode is set.
statement: Query ID for time travel.
offset: Negative integer representing seconds in the past for time travel.
timestamp: Timestamp string or datetime object.
timestamp_type: Type of timestamp interpretation ('NTZ', 'LTZ', or 'TZ').
stream: Stream name for time travel.
iceberg_tag: Iceberg snapshot tag name for time travel on Iceberg tables.
Can only be used with time_travel_mode='at'. Generates SQL clause
like ``AT(ICEBERG_TAG => 'tag_name')``.

Note:
If your table name contains special characters, use double quotes to mark it like this, ``session.table('"my table"')``.
Expand All @@ -2765,6 +2769,7 @@ def table(
>>> df_before = session.table("my_table", time_travel_mode="before", statement="01234567-abcd-1234-5678-123456789012") # doctest: +SKIP
>>> df_offset = session.table("my_table", time_travel_mode="at", offset=-3600) # doctest: +SKIP
>>> df_stream = session.table("my_table", time_travel_mode="at", stream="my_stream") # doctest: +SKIP
>>> df_iceberg_tag = session.table("my_iceberg_table", time_travel_mode="at", iceberg_tag="release_v1") # doctest: +SKIP

# timestamp_type automatically set to "TZ" due to timezone info
>>> import datetime, pytz # doctest: +SKIP
Expand Down Expand Up @@ -2792,6 +2797,8 @@ def table(
ast.timestamp_type.value = str(timestamp_type)
if stream is not None:
ast.stream.value = stream
if iceberg_tag is not None and hasattr(ast, "iceberg_tag"):
ast.iceberg_tag.value = iceberg_tag
else:
stmt = None

Expand All @@ -2810,6 +2817,7 @@ def table(
timestamp=timestamp,
timestamp_type=timestamp_type,
stream=stream,
iceberg_tag=iceberg_tag,
)
# Replace API call origin for table
set_api_call_source(t, "Session.table")
Expand Down
4 changes: 4 additions & 0 deletions src/snowflake/snowpark/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ def __init__(
timestamp: Optional[Union[str, datetime.datetime]] = None,
timestamp_type: Optional[Union[str, TimestampTimeZone]] = None,
stream: Optional[str] = None,
iceberg_tag: Optional[str] = None,
) -> None:
if _ast_stmt is None and session is not None and _emit_ast:
_ast_stmt = session._ast_batch.bind()
Expand All @@ -320,6 +321,8 @@ def __init__(
ast.timestamp_type.value = str(timestamp_type)
if stream is not None:
ast.stream.value = stream
if iceberg_tag is not None and hasattr(ast, "iceberg_tag"):
ast.iceberg_tag.value = iceberg_tag

time_travel_config = TimeTravelConfig.validate_and_normalize_params(
time_travel_mode=time_travel_mode,
Expand All @@ -328,6 +331,7 @@ def __init__(
timestamp=timestamp,
timestamp_type=timestamp_type,
stream=stream,
iceberg_tag=iceberg_tag,
)

snowflake_table_plan = SnowflakeTable(
Expand Down
40 changes: 40 additions & 0 deletions tests/unit/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -877,3 +877,43 @@ def test_generate_time_travel_sql_clause():
)
sql_clause = config.generate_sql_clause()
assert sql_clause == " AT (STATEMENT => 'abc123')"


def test_iceberg_tag_time_travel():
"""Test Iceberg tag time travel configuration."""
# Valid iceberg_tag with 'at' mode
config = TimeTravelConfig.validate_and_normalize_params(
time_travel_mode="at", iceberg_tag="release_v1"
)
assert config.time_travel_mode == "at"
assert config.iceberg_tag == "release_v1"
assert config.generate_sql_clause() == " AT (ICEBERG_TAG => 'release_v1')"

# Test with special characters in tag name
config = TimeTravelConfig.validate_and_normalize_params(
time_travel_mode="at", iceberg_tag="snapshot-2023-01-01"
)
assert config.generate_sql_clause() == " AT (ICEBERG_TAG => 'snapshot-2023-01-01')"

# Test error: iceberg_tag cannot be used with 'before' mode
with pytest.raises(
ValueError, match="Iceberg tag time travel can only be used with"
):
TimeTravelConfig.validate_and_normalize_params(
time_travel_mode="before", iceberg_tag="release_v1"
)

# Test error: cannot combine iceberg_tag with other time travel params
with pytest.raises(ValueError, match="Exactly one of"):
TimeTravelConfig.validate_and_normalize_params(
time_travel_mode="at",
iceberg_tag="release_v1",
timestamp="2023-01-01 12:00:00",
)

with pytest.raises(ValueError, match="Exactly one of"):
TimeTravelConfig.validate_and_normalize_params(
time_travel_mode="at",
iceberg_tag="release_v1",
offset=-3600,
)
Loading