Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#### Bug Fixes

- Fixed a bug where `AsyncJob.result("no_result")` sometimes silently returned without raising error for failed queries.

#### Improvements

- When `Session.reduce_describe_query_enabled` is enabled, fewer DESCRIBE queries are issued when the outer query only projects or renames columns from an inner subquery whose column types are already known.
Expand Down
5 changes: 5 additions & 0 deletions src/snowflake/snowpark/async_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,11 @@ def result(
# If we can advance in ASYNC_RETRY_PATTERN then do so
if retry_pattern_pos < (len(ASYNC_RETRY_PATTERN) - 1):
retry_pattern_pos += 1
# Without this post-loop check, a failed query would silently return None.
# The upstream `get_results_from_sfqid` only catches failures already visible
# at that single synchronous status check, and no fetch happens in NO_RESULT mode
# to trigger the prefetch hook.
self._session.connection.get_query_status_throw_if_error(self.query_id)
result = None
elif async_result_type == _AsyncResultType.PANDAS:
result = self._session._conn._to_data_or_iter(
Expand Down
30 changes: 30 additions & 0 deletions tests/integ/scala/test_async_job_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,36 @@ def test_create_async_job_negative(session):
async_job.result()


def test_async_job_no_result_raises_on_failed_query(session):
# Warm the warehouse so that subsequent iterations are dominated by the race condition
session.sql("select 1").collect()
iterations = 30
silent_none = 0
raised = 0
sample_exception_text = ""

for _ in range(iterations):
async_job = session.sql("select 1/0").collect_nowait()
try:
async_job.result("no_result")
silent_none += 1
except (DatabaseError, SnowparkSQLException) as exc:
raised += 1
if not sample_exception_text:
sample_exception_text = str(exc)

assert silent_none == 0, (
f"AsyncJob.result('no_result') silently returned None for "
f"{silent_none}/{iterations} failed division by zero queries. "
"All failures must surface as exceptions"
)
assert raised == iterations
assert (
"Division by zero" in sample_exception_text
or "FAILED_WITH_ERROR" in sample_exception_text
), f"Unexpected exception text: {sample_exception_text!r}"


@pytest.mark.skipif(IS_IN_STORED_PROC, reason="caplog is not supported")
@pytest.mark.parametrize("create_async_job_from_query_id", [True, False])
def test_get_query_from_async_job(session, create_async_job_from_query_id, caplog):
Expand Down
Loading