Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/storage/src/storage_state/async_storage_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,9 @@ impl<T: Timestamp + TimestampManipulation + Lattice + Codec64 + Display + Sync>
}
}
}
tracing::info!(
"== as_of:{as_of:?} since:{remap_since:?} resume_uppers:{resume_uppers:?}"
);

/// Convenience function to convert `BTreeMap<GlobalId, Antichain<C>>` to
/// `BTreeMap<GlobalId, Vec<Row>>`.
Expand Down
128 changes: 118 additions & 10 deletions test/pg-cdc/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""

import glob
import random
import time
from textwrap import dedent

Expand Down Expand Up @@ -364,7 +365,9 @@ def workflow_large_scale(c: Composition, parser: WorkflowArgumentParser) -> None

# Set up the Postgres server with the initial records, set up the connection to
# the Postgres server in Materialize.
c.testdrive(dedent("""
c.testdrive(
dedent(
"""
$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER USER postgres WITH replication;
DROP SCHEMA IF EXISTS public CASCADE;
Expand All @@ -381,15 +384,19 @@ def workflow_large_scale(c: Composition, parser: WorkflowArgumentParser) -> None
CREATE PUBLICATION mz_source FOR ALL TABLES;

> DROP SOURCE IF EXISTS s1 CASCADE;
"""))
"""
)
)

def make_inserts(c: Composition, start: int, batch_num: int):
c.testdrive(
args=["--no-reset"],
input=dedent(f"""
input=dedent(
f"""
$ postgres-execute connection=postgres://postgres:postgres@postgres
INSERT INTO products (id, name, merchant_id, price, status, created_at, recordSizePayload) SELECT {start} + row_number() OVER (), 'name' || ({start} + row_number() OVER ()), ({start} + row_number() OVER ()) % 1000, ({start} + row_number() OVER ()) % 1000, ({start} + row_number() OVER ()) % 10, '2024-12-12'::DATE, repeat('x', 1000000) FROM generate_series(1, {batch_num});
"""),
"""
),
)

num_rows = 100_000 # out of memory with 200_000 rows
Expand All @@ -402,34 +409,135 @@ def make_inserts(c: Composition, start: int, batch_num: int):
# can partition across workers from the first read.
c.testdrive(
args=["--no-reset"],
input=dedent("""
input=dedent(
"""
$ postgres-execute connection=postgres://postgres:postgres@postgres
ANALYZE products;
"""),
"""
),
)

c.testdrive(
args=["--no-reset"],
input=dedent(f"""
input=dedent(
f"""
> CREATE SOURCE s1
FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source')
> CREATE TABLE products FROM SOURCE s1 (REFERENCE products);
> SELECT COUNT(*) FROM products;
{num_rows}
"""),
"""
),
)

make_inserts(c, num_rows, 1)

c.testdrive(
args=["--no-reset"],
input=dedent(f"""
input=dedent(
f"""
> SELECT COUNT(*) FROM products;
{num_rows + 1}
"""),
"""
),
)


def workflow_concurrent_drop_and_restart(
c: Composition, parser: WorkflowArgumentParser
) -> None:
"""
Regression test for a race condition where dropping a source concurrently
with the upstream PG connection dying can panic the remap operator.

Root cause: FrontierUpper is reported from the timely dataflow capability
graph, which can be ahead of what persist has actually committed. The
controller uses this optimistic upper to compute step_back(upper) and
advances its dependency read hold on the remap shard accordingly. If
the source crashes (PG killed) before the persist write commits,
remap_since has already been advanced to step_back(optimistic_upper) = T,
but persist's actual data shard upper is still T (not T+1). On
SuspendAndRestart, as_of = max(step_back(T), remap_since=T) = T =
remap_upper, violating the invariant as_of < upper.

The race window is between:
- The controller processing a FrontierUpper that's ahead of persist
- The SuspendAndRestart reading the actual persist upper

Killing PG while the source is actively ingesting maximizes the chance
of hitting this window: the dataflow capability advances (triggering
FrontierUpper) but PG dying prevents the persist write from committing.
"""
pg_version = get_targeted_pg_version(parser)
with c.override(
create_postgres(pg_version=pg_version),
Materialized(
volumes_extra=["secrets:/share/secrets"],
sanity_restart=False,
),
):
c.up("materialized", "postgres")

c.run_testdrive_files(
"--no-reset",
f"--var=default-replica-size=scale=1,workers={Materialized.Size.DEFAULT_SIZE}",
"override/concurrent-drop-and-restart.td",
)

# Start a continuous stream of inserts so the source is actively
# writing when we kill PG. This maximizes the chance that the
# dataflow capability has advanced (FrontierUpper reported) but
# the persist write hasn't committed yet.
pg_conn = psycopg.connect(
host="localhost",
user="postgres",
password="postgres",
port=c.default_port("postgres"),
autocommit=True,
)
for i in range(100):
pg_conn.execute(
f"INSERT INTO t1 SELECT generate_series({40001 + i * 1000}, {41000 + i * 1000}), 'streaming'".encode()
)
time.sleep(0.01)
pg_conn.close()

# Kill postgres while data is flowing. The source's dataflow
# capability may have advanced past what persist committed.
c.kill("postgres")

# Drop the source concurrently with the health operator's 5s
# SuspendAndRestart delay. We want the DROP to race with the
# restart: the controller advances remap's dep hold based on
# the optimistic FrontierUpper, then the restart reads the
# actual (lower) persist upper.
delay_ms = random.randint(4850, 5050)
print(f"==== delay_ms:{delay_ms}")
time.sleep(delay_ms / 1000.0)

c.testdrive(
args=["--no-reset"],
input=dedent(
"""
> DROP SOURCE mz_source CASCADE
"""
),
)

# Wait for the SuspendAndRestart to fire and any restart attempt
# to manifest. If the bug triggers, clusterd will panic.
time.sleep(10)

# # Check logs for the panic.
logs = c.invoke("logs", "materialized", capture=True)
log_output = logs.stdout
if "invalid as_of: upper" in log_output:
raise RuntimeError(
f"Remap shard panic detected (delay={delay_ms}ms): "
"since == upper race condition triggered"
)


def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
def process(name: str) -> None:
if name in ("default", "large-scale"):
Expand Down
67 changes: 67 additions & 0 deletions test/pg-cdc/override/concurrent-drop-and-restart.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

# Test for a race condition where dropping a source concurrently with
# the upstream postgres connection dying can cause a panic in the remap
# operator.
#
# Root cause: FrontierUpper is reported from the timely dataflow capability
# graph, which can be AHEAD of what persist has actually committed. The
# controller uses this optimistic upper to compute step_back(upper) and
# advances its dependency read hold on the remap shard accordingly. If
# the source crashes before the persist write commits, remap_since has
# already been advanced to step_back(optimistic_upper) = T, but persist's
# actual data shard upper is still T (not T+1). On SuspendAndRestart,
# as_of = max(step_back(T), remap_since=T) = T = remap_upper, violating
# the invariant as_of < upper.

> CREATE SECRET pgpass AS 'postgres'

> CREATE CONNECTION pg TO POSTGRES (
HOST postgres,
DATABASE postgres,
USER postgres,
PASSWORD SECRET pgpass
)

> CREATE CLUSTER cdc_cluster SIZE '${arg.default-replica-size}'

$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER USER postgres WITH replication;
DROP SCHEMA IF EXISTS public CASCADE;
CREATE SCHEMA public;

DROP PUBLICATION IF EXISTS mz_source;
CREATE PUBLICATION mz_source FOR ALL TABLES;

CREATE TABLE t1 (pk INTEGER PRIMARY KEY, f2 TEXT);
ALTER TABLE t1 REPLICA IDENTITY FULL;
INSERT INTO t1 SELECT generate_series(1, 1000), 'data';

> BEGIN
> CREATE SOURCE mz_source
IN CLUSTER cdc_cluster
FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source')
> CREATE TABLE t1 FROM SOURCE mz_source (REFERENCE t1)
> COMMIT

# Wait for initial snapshot to complete and data to be visible.
> SELECT count(*) FROM t1
1000

# Keep inserting data so the source is actively writing when we kill PG.
# This maximizes the chance that the dataflow capability advances (and
# FrontierUpper is reported) before the persist write commits, creating
# the window where the controller's dep hold on remap is ahead of
# persist's actual upper.
$ postgres-execute connection=postgres://postgres:postgres@postgres
INSERT INTO t1 SELECT generate_series(1001, 40000), 'more_data';

> SELECT count(*) FROM t1
40000
Loading