Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions mtr/binlog_streaming/r/binlog_flush.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@

*** Resetting replication at the very beginning of the test.

*** Creating non-GTID transactions in one open binlog.
CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB;
FLUSH BINARY LOGS;

*** Starting pull mode without checkpointing and before any source binlog rotation.

*** Generating a configuration file in JSON format for the Binlog
*** Server utility.

*** Determining binlog file directory from the server.

*** Creating a temporary directory <BINSRV_STORAGE_PATH> for storing
*** binlog files downloaded via the Binlog Server utility.
include/read_file_to_var.inc

*** Waiting for the source-side Binlog Dump thread to report that all
*** currently available events have been streamed to the Binlog Server
*** utility.

*** Capturing storage object size before checkpoint, rotate, timeout, or graceful shutdown.
include/read_file_to_var.inc

*** Terminating pull mode without giving it a chance to flush on graceful shutdown.

*** Removing the Binlog Server utility storage directory.

*** Removing the Binlog Server utility log file.

*** Removing the Binlog Server utility configuration file.
DROP TABLE t1;

*** Verifying storage stayed at magic payload size before any valid flush trigger.
7 changes: 7 additions & 0 deletions mtr/binlog_streaming/t/binlog_flush.combinations
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[gtid_on]
enforce-gtid-consistency = ON
gtid-mode = ON

[gtid_off]
enforce-gtid-consistency = OFF
gtid-mode = OFF
115 changes: 115 additions & 0 deletions mtr/binlog_streaming/t/binlog_flush.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# The purpose of the test is to validate that PBS doesn't flush its internal binlog buffer
# to the file every time the whole transaction is replicated.

--source ../include/have_binsrv.inc

--source ../include/v80_v84_compatibility_defines.inc
--source include/count_sessions.inc

# identifying backend storage type ('file' or 's3')
--source ../include/identify_storage_backend.inc

--echo
--echo *** Resetting replication at the very beginning of the test.
--disable_query_log
eval $stmt_reset_binary_logs_and_gtids;
--enable_query_log

--echo
--echo *** Creating non-GTID transactions in one open binlog.
CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB;
FLUSH BINARY LOGS;
--let $binlog_name = query_get_value($stmt_show_binary_log_status, File, 1)

--disable_query_log
--let $transaction_idx = 0
while ($transaction_idx < 20)
{
INSERT INTO t1 VALUES(DEFAULT);
--inc $transaction_idx
}
--enable_query_log

--echo
--echo *** Starting pull mode without checkpointing and before any source binlog rotation.
--let $binsrv_connect_timeout = 10
--let $binsrv_read_timeout = 100
--let $binsrv_idle_time = 1
--let $binsrv_verify_checksum = TRUE
--let $binsrv_replication_mode = position
--let $binsrv_checkpoint_size =
--let $binsrv_checkpoint_interval =
--source ../include/set_up_binsrv_environment.inc

--let $binsrv_pid_file = $MYSQL_TMP_DIR/binsrv_non_gtid_flush_bug.pid
--let $binsrv_spawn_cmd_line = $BINSRV pull $binsrv_config_file_path > $binsrv_log_path 2>&1 & echo \$! > $binsrv_pid_file
--let EXPORTED_BINSRV_SPAWN_CMD_LINE = $binsrv_spawn_cmd_line
--perl
use strict;
use warnings;
my $cmd = $ENV{'EXPORTED_BINSRV_SPAWN_CMD_LINE'};
system("$cmd");
EOF

--let $read_from_file = $binsrv_pid_file
--source include/read_file_to_var.inc
--let $binsrv_pid = $result

--echo
--echo *** Waiting for the source-side Binlog Dump thread to report that all
--echo *** currently available events have been streamed to the Binlog Server
--echo *** utility.
# We deliberately avoid waiting for PBS to log "entering
# idle mode for" because that pattern fires only after the MySQL
# read timeout, and the read-timeout path itself flushes the event
# buffer (see receive_binlog_events()) - which would invalidate the
# "storage stayed empty before any flush trigger" invariant this
# test asserts.
let $wait_condition = SELECT COUNT(*) FROM performance_schema.processlist
WHERE COMMAND = 'Binlog Dump'
AND STATE LIKE '%sent all binlog to%';
--let $wait_timeout = 60
--source include/wait_condition.inc

--echo
--echo *** Capturing storage object size before checkpoint, rotate, timeout, or graceful shutdown.
--let $size_file = $MYSQL_TMP_DIR/non_gtid_flush_bug_size
if ($storage_backend == file)
{
--exec /bin/sh -c "wc -c < '$binsrv_storage_path/$binlog_name' > '$size_file'"
}
if ($storage_backend == s3)
{
--exec $aws_cli s3api head-object --bucket $MTR_BINSRV_AWS_S3_BUCKET --key $binsrv_storage_path/$binlog_name --query ContentLength --output text > $size_file 2>/dev/null || echo 0 > $size_file
}
--let $read_from_file = $size_file
--source include/read_file_to_var.inc
--let $storage_object_size_before_flush = $result
--remove_file $size_file

--echo
--echo *** Terminating pull mode without giving it a chance to flush on graceful shutdown.
--exec kill -9 $binsrv_pid
--remove_file $binsrv_pid_file

# Clean up any remaining binlog dump connection left by the killed pull process.
--let $binlog_dump_connection_id = `SELECT ID FROM performance_schema.processlist WHERE COMMAND = 'Binlog Dump'`
if ($binlog_dump_connection_id != '')
{
--disable_query_log
--replace_result $binlog_dump_connection_id <CONNECTION_ID>
eval KILL CONNECTION $binlog_dump_connection_id;
--enable_query_log
}
--source include/wait_until_count_sessions.inc

--source ../include/tear_down_binsrv_environment.inc

DROP TABLE t1;

--echo
--echo *** Verifying storage stayed at magic payload size before any valid flush trigger.
if ($storage_object_size_before_flush != 4)
{
--die storage object grew before checkpoint, rotate, timeout, or close; non-GTID transaction boundary flushed storage
}
4 changes: 3 additions & 1 deletion src/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,9 @@ void process_binlog_event(const binsrv::events::event_view &current_event_v,
}

// processing the very last event in the sequence - either a non-artificial
// ROTATE event or a STOP event
// ROTATE event or a STOP event. This is the path that closes the local
// binlog file and (via storage::close_binlog -> flush_event_buffer) is
// what guarantees the terminator event itself lands on the backend
if ((code == binsrv::events::code_type::rotate && !is_artificial) ||
code == binsrv::events::code_type::stop) {
process_rotate_or_stop_event(logger, storage);
Expand Down
39 changes: 17 additions & 22 deletions src/binsrv/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,34 +250,27 @@ void storage::write_event(util::const_byte_span event_data,
}

// now we are writing data from the event buffer to the storage backend if
// event buffer has some data in it that can be considered a complete
// the event buffer has some data in it that can be considered a complete
// transaction and a checkpoint event (either size-based or time-based)
// occurred or we are processing the very last event in the binlog file
// occurred. The file-boundary flush is handled separately, in
// close_binlog().

if (has_event_data_to_flush()) {
const auto ready_to_flush_position{get_ready_to_flush_position()};
const auto now_ts{std::chrono::steady_clock::now()};

bool needs_flush{false};
if (at_transaction_boundary && transaction_gtid.is_empty()) {
// a special combination of parameters when at_transaction_boundary is
// true and transaction_gtid is empty means that we received either ROTATE
// or STOP event at the very end of a binary log file - in this case we
// need to flush the event data buffer immediately regardless of whether
// one of the checkpointing events occurred or not
needs_flush = true;
} else {
// here we perform size-based checkpointing calculations based on
// calculated "ready_to_flush_position" instead of
// "get_current_position()" directly to take into account that some event
// data may remain buffered
needs_flush = (size_checkpointing_enabled() &&
(ready_to_flush_position >=
last_checkpoint_position_ + checkpoint_size_bytes_)) ||
(interval_checkpointing_enabled() &&
(now_ts >= last_checkpoint_timestamp_ +
checkpoint_interval_seconds_));
}
// here we perform size-based checkpointing calculations based on the
// calculated "ready_to_flush_position" instead of
// "get_current_position()" directly to take into account that some event
// data may remain buffered
const bool needs_flush{
(size_checkpointing_enabled() &&
(ready_to_flush_position >=
last_checkpoint_position_ + checkpoint_size_bytes_)) ||
(interval_checkpointing_enabled() &&
(now_ts >=
last_checkpoint_timestamp_ + checkpoint_interval_seconds_))};

if (needs_flush) {
flush_event_buffer_internal();

Expand All @@ -290,6 +283,8 @@ void storage::write_event(util::const_byte_span event_data,
void storage::close_binlog() {
ensure_streaming_mode();

// This flush is the only path that guarantees the file-final ROTATE/STOP
// event lands on the backend.
flush_event_buffer();
event_buffer_.clear();
event_buffer_.shrink_to_fit();
Expand Down
Loading