From c157c64ac43de162f794fdd6020931816460c3b9 Mon Sep 17 00:00:00 2001 From: Kamil Holubicki Date: Wed, 20 May 2026 08:48:52 +0200 Subject: [PATCH] PS-11136: non-GTID transactions cause one storage flush per transaction, bypassing size/interval checkpointing https://perconadev.atlassian.net/browse/PS-11136 Problem: In non-GTID (anonymous transaction) replication mode, PBS flushes its in-memory event buffer to the storage backend on every transaction boundary, ignoring the configured 'checkpoint_size_bytes' and 'checkpoint_interval_seconds' thresholds. For object-store backends this turns into one PUT per transaction. Cause: 'storage::write_event()' had a fast-path keyed on 'at_transaction_boundary && transaction_gtid.is_empty()' whose intent was "flush regardless of thresholds because this is the file-final ROTATE/STOP event". The condition wasn't tight enough: anonymous transactions also satisfy it (they never populate 'transaction_gtid_'), so every XID terminating an anonymous transaction was misidentified as a file terminator and forced a synchronous flush. Solution: Removed the fast-path. The file-final ROTATE/STOP event is still flushed - just through the already-existing 'storage::close_binlog()' call on the 'process_rotate_or_stop_event()' / artificial-rotate rename paths, which is the natural place for a file-boundary flush. GTID-mode behavior is unchanged. --- mtr/binlog_streaming/r/binlog_flush.result | 35 ++++++ .../t/binlog_flush.combinations | 7 ++ mtr/binlog_streaming/t/binlog_flush.test | 115 ++++++++++++++++++ src/app.cpp | 4 +- src/binsrv/storage.cpp | 39 +++--- 5 files changed, 177 insertions(+), 23 deletions(-) create mode 100644 mtr/binlog_streaming/r/binlog_flush.result create mode 100644 mtr/binlog_streaming/t/binlog_flush.combinations create mode 100644 mtr/binlog_streaming/t/binlog_flush.test diff --git a/mtr/binlog_streaming/r/binlog_flush.result b/mtr/binlog_streaming/r/binlog_flush.result new file mode 100644 index 0000000..401284b --- /dev/null +++ b/mtr/binlog_streaming/r/binlog_flush.result @@ -0,0 +1,35 @@ + +*** Resetting replication at the very beginning of the test. + +*** Creating non-GTID transactions in one open binlog. +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; +FLUSH BINARY LOGS; + +*** Starting pull mode without checkpointing and before any source binlog rotation. + +*** Generating a configuration file in JSON format for the Binlog +*** Server utility. + +*** Determining binlog file directory from the server. + +*** Creating a temporary directory for storing +*** binlog files downloaded via the Binlog Server utility. +include/read_file_to_var.inc + +*** Waiting for the source-side Binlog Dump thread to report that all +*** currently available events have been streamed to the Binlog Server +*** utility. + +*** Capturing storage object size before checkpoint, rotate, timeout, or graceful shutdown. +include/read_file_to_var.inc + +*** Terminating pull mode without giving it a chance to flush on graceful shutdown. + +*** Removing the Binlog Server utility storage directory. + +*** Removing the Binlog Server utility log file. + +*** Removing the Binlog Server utility configuration file. +DROP TABLE t1; + +*** Verifying storage stayed at magic payload size before any valid flush trigger. diff --git a/mtr/binlog_streaming/t/binlog_flush.combinations b/mtr/binlog_streaming/t/binlog_flush.combinations new file mode 100644 index 0000000..b1f8b51 --- /dev/null +++ b/mtr/binlog_streaming/t/binlog_flush.combinations @@ -0,0 +1,7 @@ +[gtid_on] +enforce-gtid-consistency = ON +gtid-mode = ON + +[gtid_off] +enforce-gtid-consistency = OFF +gtid-mode = OFF diff --git a/mtr/binlog_streaming/t/binlog_flush.test b/mtr/binlog_streaming/t/binlog_flush.test new file mode 100644 index 0000000..a37d7d6 --- /dev/null +++ b/mtr/binlog_streaming/t/binlog_flush.test @@ -0,0 +1,115 @@ +# The purpose of the test is to validate that PBS doesn't flush its internal binlog buffer +# to the file every time the whole transaction is replicated. + +--source ../include/have_binsrv.inc + +--source ../include/v80_v84_compatibility_defines.inc +--source include/count_sessions.inc + +# identifying backend storage type ('file' or 's3') +--source ../include/identify_storage_backend.inc + +--echo +--echo *** Resetting replication at the very beginning of the test. +--disable_query_log +eval $stmt_reset_binary_logs_and_gtids; +--enable_query_log + +--echo +--echo *** Creating non-GTID transactions in one open binlog. +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; +FLUSH BINARY LOGS; +--let $binlog_name = query_get_value($stmt_show_binary_log_status, File, 1) + +--disable_query_log +--let $transaction_idx = 0 +while ($transaction_idx < 20) +{ + INSERT INTO t1 VALUES(DEFAULT); + --inc $transaction_idx +} +--enable_query_log + +--echo +--echo *** Starting pull mode without checkpointing and before any source binlog rotation. +--let $binsrv_connect_timeout = 10 +--let $binsrv_read_timeout = 100 +--let $binsrv_idle_time = 1 +--let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = position +--let $binsrv_checkpoint_size = +--let $binsrv_checkpoint_interval = +--source ../include/set_up_binsrv_environment.inc + +--let $binsrv_pid_file = $MYSQL_TMP_DIR/binsrv_non_gtid_flush_bug.pid +--let $binsrv_spawn_cmd_line = $BINSRV pull $binsrv_config_file_path > $binsrv_log_path 2>&1 & echo \$! > $binsrv_pid_file +--let EXPORTED_BINSRV_SPAWN_CMD_LINE = $binsrv_spawn_cmd_line +--perl + use strict; + use warnings; + my $cmd = $ENV{'EXPORTED_BINSRV_SPAWN_CMD_LINE'}; + system("$cmd"); +EOF + +--let $read_from_file = $binsrv_pid_file +--source include/read_file_to_var.inc +--let $binsrv_pid = $result + +--echo +--echo *** Waiting for the source-side Binlog Dump thread to report that all +--echo *** currently available events have been streamed to the Binlog Server +--echo *** utility. +# We deliberately avoid waiting for PBS to log "entering +# idle mode for" because that pattern fires only after the MySQL +# read timeout, and the read-timeout path itself flushes the event +# buffer (see receive_binlog_events()) - which would invalidate the +# "storage stayed empty before any flush trigger" invariant this +# test asserts. +let $wait_condition = SELECT COUNT(*) FROM performance_schema.processlist + WHERE COMMAND = 'Binlog Dump' + AND STATE LIKE '%sent all binlog to%'; +--let $wait_timeout = 60 +--source include/wait_condition.inc + +--echo +--echo *** Capturing storage object size before checkpoint, rotate, timeout, or graceful shutdown. +--let $size_file = $MYSQL_TMP_DIR/non_gtid_flush_bug_size +if ($storage_backend == file) +{ + --exec /bin/sh -c "wc -c < '$binsrv_storage_path/$binlog_name' > '$size_file'" +} +if ($storage_backend == s3) +{ + --exec $aws_cli s3api head-object --bucket $MTR_BINSRV_AWS_S3_BUCKET --key $binsrv_storage_path/$binlog_name --query ContentLength --output text > $size_file 2>/dev/null || echo 0 > $size_file +} +--let $read_from_file = $size_file +--source include/read_file_to_var.inc +--let $storage_object_size_before_flush = $result +--remove_file $size_file + +--echo +--echo *** Terminating pull mode without giving it a chance to flush on graceful shutdown. +--exec kill -9 $binsrv_pid +--remove_file $binsrv_pid_file + +# Clean up any remaining binlog dump connection left by the killed pull process. +--let $binlog_dump_connection_id = `SELECT ID FROM performance_schema.processlist WHERE COMMAND = 'Binlog Dump'` +if ($binlog_dump_connection_id != '') +{ + --disable_query_log + --replace_result $binlog_dump_connection_id + eval KILL CONNECTION $binlog_dump_connection_id; + --enable_query_log +} +--source include/wait_until_count_sessions.inc + +--source ../include/tear_down_binsrv_environment.inc + +DROP TABLE t1; + +--echo +--echo *** Verifying storage stayed at magic payload size before any valid flush trigger. +if ($storage_object_size_before_flush != 4) +{ + --die storage object grew before checkpoint, rotate, timeout, or close; non-GTID transaction boundary flushed storage +} diff --git a/src/app.cpp b/src/app.cpp index 7815ffc..0f8a5a1 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -542,7 +542,9 @@ void process_binlog_event(const binsrv::events::event_view ¤t_event_v, } // processing the very last event in the sequence - either a non-artificial - // ROTATE event or a STOP event + // ROTATE event or a STOP event. This is the path that closes the local + // binlog file and (via storage::close_binlog -> flush_event_buffer) is + // what guarantees the terminator event itself lands on the backend if ((code == binsrv::events::code_type::rotate && !is_artificial) || code == binsrv::events::code_type::stop) { process_rotate_or_stop_event(logger, storage); diff --git a/src/binsrv/storage.cpp b/src/binsrv/storage.cpp index 5beb67d..2cdca86 100644 --- a/src/binsrv/storage.cpp +++ b/src/binsrv/storage.cpp @@ -250,34 +250,27 @@ void storage::write_event(util::const_byte_span event_data, } // now we are writing data from the event buffer to the storage backend if - // event buffer has some data in it that can be considered a complete + // the event buffer has some data in it that can be considered a complete // transaction and a checkpoint event (either size-based or time-based) - // occurred or we are processing the very last event in the binlog file + // occurred. The file-boundary flush is handled separately, in + // close_binlog(). if (has_event_data_to_flush()) { const auto ready_to_flush_position{get_ready_to_flush_position()}; const auto now_ts{std::chrono::steady_clock::now()}; - bool needs_flush{false}; - if (at_transaction_boundary && transaction_gtid.is_empty()) { - // a special combination of parameters when at_transaction_boundary is - // true and transaction_gtid is empty means that we received either ROTATE - // or STOP event at the very end of a binary log file - in this case we - // need to flush the event data buffer immediately regardless of whether - // one of the checkpointing events occurred or not - needs_flush = true; - } else { - // here we perform size-based checkpointing calculations based on - // calculated "ready_to_flush_position" instead of - // "get_current_position()" directly to take into account that some event - // data may remain buffered - needs_flush = (size_checkpointing_enabled() && - (ready_to_flush_position >= - last_checkpoint_position_ + checkpoint_size_bytes_)) || - (interval_checkpointing_enabled() && - (now_ts >= last_checkpoint_timestamp_ + - checkpoint_interval_seconds_)); - } + // here we perform size-based checkpointing calculations based on the + // calculated "ready_to_flush_position" instead of + // "get_current_position()" directly to take into account that some event + // data may remain buffered + const bool needs_flush{ + (size_checkpointing_enabled() && + (ready_to_flush_position >= + last_checkpoint_position_ + checkpoint_size_bytes_)) || + (interval_checkpointing_enabled() && + (now_ts >= + last_checkpoint_timestamp_ + checkpoint_interval_seconds_))}; + if (needs_flush) { flush_event_buffer_internal(); @@ -290,6 +283,8 @@ void storage::write_event(util::const_byte_span event_data, void storage::close_binlog() { ensure_streaming_mode(); + // This flush is the only path that guarantees the file-final ROTATE/STOP + // event lands on the backend. flush_event_buffer(); event_buffer_.clear(); event_buffer_.shrink_to_fit();