diff --git a/src/storage/src/storage_state/async_storage_worker.rs b/src/storage/src/storage_state/async_storage_worker.rs index f50c130834f4d..d1587322225be 100644 --- a/src/storage/src/storage_state/async_storage_worker.rs +++ b/src/storage/src/storage_state/async_storage_worker.rs @@ -320,6 +320,9 @@ impl } } } + tracing::info!( + "== as_of:{as_of:?} since:{remap_since:?} resume_uppers:{resume_uppers:?}" + ); /// Convenience function to convert `BTreeMap>` to /// `BTreeMap>`. diff --git a/test/pg-cdc/mzcompose.py b/test/pg-cdc/mzcompose.py index 758976aff51fd..5bb11a3a1d04d 100644 --- a/test/pg-cdc/mzcompose.py +++ b/test/pg-cdc/mzcompose.py @@ -12,6 +12,7 @@ """ import glob +import random import time from textwrap import dedent @@ -364,7 +365,9 @@ def workflow_large_scale(c: Composition, parser: WorkflowArgumentParser) -> None # Set up the Postgres server with the initial records, set up the connection to # the Postgres server in Materialize. - c.testdrive(dedent(""" + c.testdrive( + dedent( + """ $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER USER postgres WITH replication; DROP SCHEMA IF EXISTS public CASCADE; @@ -381,15 +384,19 @@ def workflow_large_scale(c: Composition, parser: WorkflowArgumentParser) -> None CREATE PUBLICATION mz_source FOR ALL TABLES; > DROP SOURCE IF EXISTS s1 CASCADE; - """)) + """ + ) + ) def make_inserts(c: Composition, start: int, batch_num: int): c.testdrive( args=["--no-reset"], - input=dedent(f""" + input=dedent( + f""" $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO products (id, name, merchant_id, price, status, created_at, recordSizePayload) SELECT {start} + row_number() OVER (), 'name' || ({start} + row_number() OVER ()), ({start} + row_number() OVER ()) % 1000, ({start} + row_number() OVER ()) % 1000, ({start} + row_number() OVER ()) % 10, '2024-12-12'::DATE, repeat('x', 1000000) FROM generate_series(1, {batch_num}); - """), + """ + ), ) num_rows = 100_000 # out of memory with 200_000 rows @@ -402,34 +409,135 @@ def make_inserts(c: Composition, start: int, batch_num: int): # can partition across workers from the first read. c.testdrive( args=["--no-reset"], - input=dedent(""" + input=dedent( + """ $ postgres-execute connection=postgres://postgres:postgres@postgres ANALYZE products; - """), + """ + ), ) c.testdrive( args=["--no-reset"], - input=dedent(f""" + input=dedent( + f""" > CREATE SOURCE s1 FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source') > CREATE TABLE products FROM SOURCE s1 (REFERENCE products); > SELECT COUNT(*) FROM products; {num_rows} - """), + """ + ), ) make_inserts(c, num_rows, 1) c.testdrive( args=["--no-reset"], - input=dedent(f""" + input=dedent( + f""" > SELECT COUNT(*) FROM products; {num_rows + 1} - """), + """ + ), ) +def workflow_concurrent_drop_and_restart( + c: Composition, parser: WorkflowArgumentParser +) -> None: + """ + Regression test for a race condition where dropping a source concurrently + with the upstream PG connection dying can panic the remap operator. + + Root cause: FrontierUpper is reported from the timely dataflow capability + graph, which can be ahead of what persist has actually committed. The + controller uses this optimistic upper to compute step_back(upper) and + advances its dependency read hold on the remap shard accordingly. If + the source crashes (PG killed) before the persist write commits, + remap_since has already been advanced to step_back(optimistic_upper) = T, + but persist's actual data shard upper is still T (not T+1). On + SuspendAndRestart, as_of = max(step_back(T), remap_since=T) = T = + remap_upper, violating the invariant as_of < upper. + + The race window is between: + - The controller processing a FrontierUpper that's ahead of persist + - The SuspendAndRestart reading the actual persist upper + + Killing PG while the source is actively ingesting maximizes the chance + of hitting this window: the dataflow capability advances (triggering + FrontierUpper) but PG dying prevents the persist write from committing. + """ + pg_version = get_targeted_pg_version(parser) + with c.override( + create_postgres(pg_version=pg_version), + Materialized( + volumes_extra=["secrets:/share/secrets"], + sanity_restart=False, + ), + ): + c.up("materialized", "postgres") + + c.run_testdrive_files( + "--no-reset", + f"--var=default-replica-size=scale=1,workers={Materialized.Size.DEFAULT_SIZE}", + "override/concurrent-drop-and-restart.td", + ) + + # Start a continuous stream of inserts so the source is actively + # writing when we kill PG. This maximizes the chance that the + # dataflow capability has advanced (FrontierUpper reported) but + # the persist write hasn't committed yet. + pg_conn = psycopg.connect( + host="localhost", + user="postgres", + password="postgres", + port=c.default_port("postgres"), + autocommit=True, + ) + for i in range(100): + pg_conn.execute( + f"INSERT INTO t1 SELECT generate_series({40001 + i * 1000}, {41000 + i * 1000}), 'streaming'".encode() + ) + time.sleep(0.01) + pg_conn.close() + + # Kill postgres while data is flowing. The source's dataflow + # capability may have advanced past what persist committed. + c.kill("postgres") + + # Drop the source concurrently with the health operator's 5s + # SuspendAndRestart delay. We want the DROP to race with the + # restart: the controller advances remap's dep hold based on + # the optimistic FrontierUpper, then the restart reads the + # actual (lower) persist upper. + delay_ms = random.randint(4850, 5050) + print(f"==== delay_ms:{delay_ms}") + time.sleep(delay_ms / 1000.0) + + c.testdrive( + args=["--no-reset"], + input=dedent( + """ + > DROP SOURCE mz_source CASCADE + """ + ), + ) + + # Wait for the SuspendAndRestart to fire and any restart attempt + # to manifest. If the bug triggers, clusterd will panic. + time.sleep(10) + + # # Check logs for the panic. + logs = c.invoke("logs", "materialized", capture=True) + log_output = logs.stdout + if "invalid as_of: upper" in log_output: + raise RuntimeError( + f"Remap shard panic detected (delay={delay_ms}ms): " + "since == upper race condition triggered" + ) + + def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: def process(name: str) -> None: if name in ("default", "large-scale"): diff --git a/test/pg-cdc/override/concurrent-drop-and-restart.td b/test/pg-cdc/override/concurrent-drop-and-restart.td new file mode 100644 index 0000000000000..6c411fb509813 --- /dev/null +++ b/test/pg-cdc/override/concurrent-drop-and-restart.td @@ -0,0 +1,67 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Test for a race condition where dropping a source concurrently with +# the upstream postgres connection dying can cause a panic in the remap +# operator. +# +# Root cause: FrontierUpper is reported from the timely dataflow capability +# graph, which can be AHEAD of what persist has actually committed. The +# controller uses this optimistic upper to compute step_back(upper) and +# advances its dependency read hold on the remap shard accordingly. If +# the source crashes before the persist write commits, remap_since has +# already been advanced to step_back(optimistic_upper) = T, but persist's +# actual data shard upper is still T (not T+1). On SuspendAndRestart, +# as_of = max(step_back(T), remap_since=T) = T = remap_upper, violating +# the invariant as_of < upper. + +> CREATE SECRET pgpass AS 'postgres' + +> CREATE CONNECTION pg TO POSTGRES ( + HOST postgres, + DATABASE postgres, + USER postgres, + PASSWORD SECRET pgpass + ) + +> CREATE CLUSTER cdc_cluster SIZE '${arg.default-replica-size}' + +$ postgres-execute connection=postgres://postgres:postgres@postgres +ALTER USER postgres WITH replication; +DROP SCHEMA IF EXISTS public CASCADE; +CREATE SCHEMA public; + +DROP PUBLICATION IF EXISTS mz_source; +CREATE PUBLICATION mz_source FOR ALL TABLES; + +CREATE TABLE t1 (pk INTEGER PRIMARY KEY, f2 TEXT); +ALTER TABLE t1 REPLICA IDENTITY FULL; +INSERT INTO t1 SELECT generate_series(1, 1000), 'data'; + +> BEGIN +> CREATE SOURCE mz_source + IN CLUSTER cdc_cluster + FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source') +> CREATE TABLE t1 FROM SOURCE mz_source (REFERENCE t1) +> COMMIT + +# Wait for initial snapshot to complete and data to be visible. +> SELECT count(*) FROM t1 +1000 + +# Keep inserting data so the source is actively writing when we kill PG. +# This maximizes the chance that the dataflow capability advances (and +# FrontierUpper is reported) before the persist write commits, creating +# the window where the controller's dep hold on remap is ahead of +# persist's actual upper. +$ postgres-execute connection=postgres://postgres:postgres@postgres +INSERT INTO t1 SELECT generate_series(1001, 40000), 'more_data'; + +> SELECT count(*) FROM t1 +40000