Skip to content

[BUG] PySpark Observer fails when used with Delta Tables #6534

@mniehoff

Description

@mniehoff

Bug

Which Delta project/connector is this regarding

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Describe the problem

PySpark Observer fails when used in combination with Delta table.

An error occurred while calling z:org.apache.spark.sql.api.python.PythonSQLUtils.toPyRow.

Steps to reproduce

Run code with PySpark 4.1.1 and delta-spark 4.1.0

builder = (
    SparkSession.builder
    .config("spark.sql.sources.default", "delta")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

df = spark.createDataFrame([(1, "a"), (2, "b")], ["id", "value"])

observation = Observation("write_metrics_batch")
observed_df = df.observe(observation, count(lit(1)).alias("num_inserted_rows"))

table_name = "default.test_obs_bug_table"
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
observed_df.write.mode("overwrite").saveAsTable(table_name)
assert observation.get["num_inserted_rows"] == 2

Observed results

Code fails with Exception on observation.get

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.venv/lib/python3.12/site-packages/pyspark/sql/observation.py:152: in get
    row: Row = CPickleSerializer().loads(utils.toPyRow(jrow))
                                         ^^^^^^^^^^^^^^^^^^^
.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1362: in __call__
    return_value = get_return_value(
.venv/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py:263: in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

answer = 'xro74'
gateway_client = <py4j.clientserver.JavaClient object at 0x10e58ef00>
target_id = 'z:org.apache.spark.sql.api.python.PythonSQLUtils', name = 'toPyRow'

    def get_return_value(answer, gateway_client, target_id=None, name=None):
        """Converts an answer received from the Java gateway into a Python object.
    
        For example, string representation of integers are converted to Python
        integer, string representation of objects are converted to JavaObject
        instances, etc.
    
        :param answer: the string returned by the Java gateway
        :param gateway_client: the gateway client used to communicate with the Java
            Gateway. Only necessary if the answer is a reference (e.g., object,
            list, map)
        :param target_id: the name of the object from which the answer comes from
            (e.g., *object1* in `object1.hello()`). Optional.
        :param name: the name of the member from which the answer comes from
            (e.g., *hello* in `object1.hello()`). Optional.
        """
        if is_error(answer)[0]:
            if len(answer) > 1:
                type = answer[1]
                value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
                if answer[1] == REFERENCE_TYPE:
>                   raise Py4JJavaError(
                        "An error occurred while calling {0}{1}{2}.\n".
                        format(target_id, ".", name), value)
E                   py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.api.python.PythonSQLUtils.toPyRow.
E                   : java.lang.AssertionError: assertion failed
E                   	at scala.Predef$.assert(Predef.scala:264)
E                   	at org.apache.spark.sql.api.python.PythonSQLUtils$.toPyRow(PythonSQLUtils.scala:120)
E                   	at org.apache.spark.sql.api.python.PythonSQLUtils.toPyRow(PythonSQLUtils.scala)
E                   	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E                   	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
E                   	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
E                   	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
E                   	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E                   	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
E                   	at py4j.Gateway.invoke(Gateway.java:282)
E                   	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E                   	at py4j.commands.CallCommand.execute(CallCommand.java:79)
E                   	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
E                   	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
E                   	at java.base/java.lang.Thread.run(Thread.java:1583)

.venv/lib/python3.12/site-packages/py4j/protocol.py:327: Py4JJavaError

Expected results

Code/Test runs without Error

Further details

The same code works fine for PySpark 4.0.2 and spark-delta 4.1.0.
So my first idea was: its a failure in pyspark.
But, the same code with standard spark session works fine in both PySpark 4.0.2 and 4.1.1. So I guess it's on delta.

Code without delta, working on PySpark 4.1.1 and 4.0.2

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([(1, "a"), (2, "b")], ["id", "value"])

observation = Observation("write_metrics_batch")
observed_df = df.observe(observation, count(lit(1)).alias("num_inserted_rows"))

table_name = "default.test_obs_bug_table"
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
observed_df.write.mode("overwrite").saveAsTable(table_name)
assert observation.get["num_inserted_rows"] == 2

Environment information

  • Delta Lake version: 4.1.0
  • Spark version: 4.1.1
  • Python version: 3.12.12

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • Yes. I can contribute a fix for this bug independently.
  • Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community. But I don't have any clue where to look.
  • No. I cannot contribute a bug fix at this time.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions