From 3f4af71662e433cc29155f25e72afabb77ff09de Mon Sep 17 00:00:00 2001 From: Yura Sorokin Date: Fri, 22 May 2026 01:19:44 +0200 Subject: [PATCH] PS-11080 fix: Spoiled logical clock information in rewritten binlog (part 3) https://perconadev.atlassian.net/browse/PS-11080 Implemented generic event rewriting mechanism ('binsrv::events::rewriter::rewrite()' static method) that performs the following operations. * For GTID_LOG /ANONYMOUS_GTID_LOG it changes 'sequence_number' / 'last_committed' fields in the post header and relocates the event (changes 'next_event_position' in the common header). * For GTID_TAGGED_LOG it changes 'sequence_number' / 'last_committed' in the serializer-encoded event body, which because of the variable length encoding of individual elements in the archive may change its size and may require updating 'transaction_length' in the event body and 'event_size' in the common header. In addition, it also performs event relocation. * For every other event it simply performs event relocation. All these changes are performed under a special guard ('binsrv::events::event_updatable_view::write_proxy') which guarantees that the event checksum will be recalculated / added upon finalizing all field value updates. We also make sure that all rewritten events will have a footer with properly calculated checksum (event if it was not present in the original event). The logic for updating 'sequence_number' / 'last_committed' in GTID events is encapsulated inside ('fix_sequence_number_and_last_committed()' function). Refactored the way how several event fields can be modified in a single run via 'event_updatable_view' with guarantee that event's checksum will be properly recalculated. Introduced 'rewriter::generic_materialize()' template function that accepts generic field modification functor. Also introduced two simplified helper functions (which use the generic one underneath) for the most typical cases: * 'rewriter::materialize()' * 'rewriter::materialize_and_relocate()' Co-authored-by: Kamil Holubicki --- CMakeLists.txt | 4 + src/app.cpp | 14 +- src/binsrv/events/event_view.cpp | 69 ---- src/binsrv/events/event_view.hpp | 5 +- src/binsrv/events/event_view_fwd.hpp | 9 - src/binsrv/events/gtid_log_post_header.hpp | 6 + .../events/gtid_tagged_log_body_impl.cpp | 4 +- .../events/gtid_tagged_log_body_impl.hpp | 6 + src/binsrv/events/rewriter.cpp | 330 ++++++++++++++++++ src/binsrv/events/rewriter.hpp | 128 +++++++ src/binsrv/events/rewriter_fwd.hpp | 33 ++ tests/CMakeLists.txt | 8 +- tests/event_test.cpp | 13 +- 13 files changed, 528 insertions(+), 101 deletions(-) create mode 100644 src/binsrv/events/rewriter.cpp create mode 100644 src/binsrv/events/rewriter.hpp create mode 100644 src/binsrv/events/rewriter_fwd.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 42f035c..0d60fa9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -188,6 +188,10 @@ set(source_files src/binsrv/events/reader_context.hpp src/binsrv/events/reader_context.cpp + src/binsrv/events/rewriter_fwd.hpp + src/binsrv/events/rewriter.hpp + src/binsrv/events/rewriter.cpp + src/binsrv/events/rotate_body_impl_fwd.hpp src/binsrv/events/rotate_body_impl.hpp src/binsrv/events/rotate_body_impl.cpp diff --git a/src/app.cpp b/src/app.cpp index 1b10bc1..4076ac5 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -66,6 +66,7 @@ #include "binsrv/events/event_view.hpp" #include "binsrv/events/protocol_traits_fwd.hpp" #include "binsrv/events/reader_context.hpp" +#include "binsrv/events/rewriter.hpp" #include "easymysql/connection.hpp" #include "easymysql/connection_config.hpp" @@ -756,16 +757,9 @@ void rewrite_and_process_binlog_event( // in rewrite mode we need to update next_event_position (and optional // checksum in the footer) in the received event data portion binsrv::events::event_storage buffer{}; - const auto event_copy_uv{binsrv::events::materialize( - current_event_v, buffer, - binsrv::events::materialization_type::force_add_checksum)}; - { - // TODO: optimize redundant checksum recalculation - const auto proxy{event_copy_uv.get_write_proxy()}; - proxy.get_common_header_updatable_view().set_next_event_position_raw( - static_cast(storage.get_current_position() + - event_copy_uv.get_total_size())); - } + const auto event_copy_uv{binsrv::events::rewriter::rewrite( + storage.get_last_transaction_sequence_number(), current_event_v, buffer, + storage.get_current_position())}; process_binlog_event(event_copy_uv, logger, context, storage); } diff --git a/src/binsrv/events/event_view.cpp b/src/binsrv/events/event_view.cpp index ea3002d..46aead9 100644 --- a/src/binsrv/events/event_view.cpp +++ b/src/binsrv/events/event_view.cpp @@ -27,7 +27,6 @@ #include "binsrv/events/code_type.hpp" #include "binsrv/events/common_header_view.hpp" -#include "binsrv/events/event_fwd.hpp" #include "binsrv/events/footer_view.hpp" // needed for extracting info from the FDE body #include "binsrv/events/format_description_body_impl.hpp" // IWYU pragma: keep @@ -166,74 +165,6 @@ event_view_base::get_footer_updatable_view() const { return footer_updatable_view{get_footer_updatable_raw()}; } -[[nodiscard]] event_updatable_view materialize(const event_view &event_v, - event_storage &buffer, - materialization_type mode) { - // mode adjustments for cases when nothing has to be changed - if (mode == materialization_type::force_remove_checksum && - !event_v.has_footer()) { - mode = materialization_type::leave_checksum_as_is; - } - if (mode == materialization_type::force_add_checksum && - event_v.has_footer()) { - mode = materialization_type::leave_checksum_as_is; - } - - // source does not have checksum, destination should - if (mode == materialization_type::force_add_checksum) { - assert(!event_v.has_footer()); - const auto event_size_with_footer{event_v.get_total_size() + - footer_view_base::size_in_bytes}; - const auto source_portion{event_v.get_portion()}; - buffer.reserve(event_size_with_footer); - buffer.assign(std::cbegin(source_portion), std::cend(source_portion)); - buffer.resize(event_size_with_footer); - const util::byte_span destination_portion{std::begin(buffer), - std::size(buffer)}; - event_updatable_view result{destination_portion, - event_v.get_post_header_size(), - footer_view_base::size_in_bytes}; - { - const auto write_proxy{result.get_write_proxy()}; - write_proxy.get_common_header_updatable_view().set_event_size_raw( - static_cast(event_size_with_footer)); - } - return result; - } - - // source has checksum, destination should not - if (mode == materialization_type::force_remove_checksum) { - assert(event_v.has_footer()); - const auto event_size_wo_footer{event_v.get_total_size() - - footer_view_base::size_in_bytes}; - const auto source_portion{ - event_v.get_portion().subspan(0U, event_size_wo_footer)}; - buffer.assign(std::cbegin(source_portion), std::cend(source_portion)); - const util::byte_span destination_portion{std::begin(buffer), - std::size(buffer)}; - event_updatable_view result{destination_portion, - event_v.get_post_header_size(), 0U}; - { - const auto write_proxy{result.get_write_proxy()}; - write_proxy.get_common_header_updatable_view().set_event_size_raw( - static_cast(event_size_wo_footer)); - } - return result; - } - - // either source has checksum and destination should or - // source does not have checksum and destination should not - assert(mode == materialization_type::leave_checksum_as_is); - - const auto source_portion{event_v.get_portion()}; - buffer.assign(std::cbegin(source_portion), std::cend(source_portion)); - const util::byte_span destination_portion{std::begin(buffer), - std::size(buffer)}; - return event_updatable_view{destination_portion, - event_v.get_post_header_size(), - event_v.get_footer_size()}; -} - std::ostream &operator<<(std::ostream &output, const event_view &obj) { return output << "common header: " << event_view::get_common_header_size() << " byte(s), post header: " << obj.get_post_header_size() diff --git a/src/binsrv/events/event_view.hpp b/src/binsrv/events/event_view.hpp index ada6f72..1a4446e 100644 --- a/src/binsrv/events/event_view.hpp +++ b/src/binsrv/events/event_view.hpp @@ -22,6 +22,7 @@ #include "binsrv/events/footer_view.hpp" // IWYU pragma: export #include "binsrv/events/protocol_traits_fwd.hpp" #include "binsrv/events/reader_context_fwd.hpp" +#include "binsrv/events/rewriter_fwd.hpp" #include "util/byte_span_fwd.hpp" @@ -124,9 +125,7 @@ class [[nodiscard]] event_view_base { class [[nodiscard]] event_updatable_view : private event_view_base { friend class event_view; - friend event_updatable_view materialize(const event_view &event_v, - event_storage &buffer, - materialization_type mode); + friend class rewriter; public: // an auxiliary proxy class that allows to update the content of an diff --git a/src/binsrv/events/event_view_fwd.hpp b/src/binsrv/events/event_view_fwd.hpp index 9fdd2ab..a87e809 100644 --- a/src/binsrv/events/event_view_fwd.hpp +++ b/src/binsrv/events/event_view_fwd.hpp @@ -26,15 +26,6 @@ namespace binsrv::events { class event_updatable_view; class event_view; -enum class materialization_type : std::uint8_t { - force_add_checksum, - force_remove_checksum, - leave_checksum_as_is -}; -[[nodiscard]] event_updatable_view materialize(const event_view &event_v, - event_storage &buffer, - materialization_type mode); - std::ostream &operator<<(std::ostream &output, const event_view &obj); } // namespace binsrv::events diff --git a/src/binsrv/events/gtid_log_post_header.hpp b/src/binsrv/events/gtid_log_post_header.hpp index 154d08f..82b9902 100644 --- a/src/binsrv/events/gtid_log_post_header.hpp +++ b/src/binsrv/events/gtid_log_post_header.hpp @@ -76,6 +76,9 @@ class [[nodiscard]] gtid_log_post_header { void set_last_committed_raw(std::int64_t last_committed) noexcept { last_committed_ = last_committed; } + void set_last_committed(events::seq_no_t last_committed) noexcept { + last_committed_ = static_cast(last_committed); + } [[nodiscard]] std::int64_t get_sequence_number_raw() const noexcept { return sequence_number_; @@ -86,6 +89,9 @@ class [[nodiscard]] gtid_log_post_header { void set_sequence_number_raw(std::int64_t sequence_number) noexcept { sequence_number_ = sequence_number; } + void set_sequence_number(events::seq_no_t sequence_number) noexcept { + sequence_number_ = static_cast(sequence_number); + } [[nodiscard]] static std::size_t calculate_encoded_size() noexcept { return size_in_bytes; diff --git a/src/binsrv/events/gtid_tagged_log_body_impl.cpp b/src/binsrv/events/gtid_tagged_log_body_impl.cpp index cab5246..0154073 100644 --- a/src/binsrv/events/gtid_tagged_log_body_impl.cpp +++ b/src/binsrv/events/gtid_tagged_log_body_impl.cpp @@ -162,8 +162,8 @@ generic_body_impl::generic_body_impl( // https://github.com/mysql/mysql-server/blob/mysql-8.4.6/libs/mysql/serialization/readme.md?plain=1#L113 // ::= - // { } Extracting - // + // { } + // Extracting std::size_t serializable_field_size{}; if (!util::extract_varlen_int_from_byte_span_checked( remainder, serializable_field_size)) { diff --git a/src/binsrv/events/gtid_tagged_log_body_impl.hpp b/src/binsrv/events/gtid_tagged_log_body_impl.hpp index 9718caa..0a2e7ba 100644 --- a/src/binsrv/events/gtid_tagged_log_body_impl.hpp +++ b/src/binsrv/events/gtid_tagged_log_body_impl.hpp @@ -87,6 +87,9 @@ template <> class [[nodiscard]] generic_body_impl { void set_last_committed_raw(std::int64_t last_committed) noexcept { last_committed_ = last_committed; } + void set_last_committed(events::seq_no_t last_committed) noexcept { + last_committed_ = static_cast(last_committed); + } [[nodiscard]] std::int64_t get_sequence_number_raw() const noexcept { return sequence_number_; @@ -97,6 +100,9 @@ template <> class [[nodiscard]] generic_body_impl { void set_sequence_number_raw(std::int64_t sequence_number) noexcept { sequence_number_ = sequence_number; } + void set_sequence_number(events::seq_no_t sequence_number) noexcept { + sequence_number_ = static_cast(sequence_number); + } [[nodiscard]] std::uint64_t get_immediate_commit_timestamp_raw() const noexcept { diff --git a/src/binsrv/events/rewriter.cpp b/src/binsrv/events/rewriter.cpp new file mode 100644 index 0000000..1d6ca54 --- /dev/null +++ b/src/binsrv/events/rewriter.cpp @@ -0,0 +1,330 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#include "binsrv/events/rewriter.hpp" + +#include +#include +#include +#include +#include +#include + +// needed for 'event_storage' +#include // IWYU pragma: keep + +#include "binsrv/events/code_type.hpp" +#include "binsrv/events/common_types.hpp" +#include "binsrv/events/event_fwd.hpp" +#include "binsrv/events/event_view.hpp" +#include "binsrv/events/gtid_log_post_header.hpp" +#include "binsrv/events/gtid_tagged_log_body_impl.hpp" +#include "binsrv/events/protocol_traits_fwd.hpp" + +#include "util/byte_span_fwd.hpp" +#include "util/exception_location_helpers.hpp" + +namespace binsrv::events { + +[[nodiscard]] event_updatable_view +rewriter::materialize(const event_view &event_v, event_storage &buffer, + materialization_type mode) { + return generic_materialize( + event_v, buffer, mode, + [](materialization_type normalized_mode, + const event_updatable_view::write_proxy &proxy) { + if (normalized_mode != materialization_type::leave_checksum_as_is) { + const auto common_header_uv{proxy.get_common_header_updatable_view()}; + const auto total_size{std::size(proxy.get_updatable_portion())}; + common_header_uv.set_event_size_raw( + static_cast(total_size)); + } + }); +} + +[[nodiscard]] event_updatable_view rewriter::materialize_and_relocate( + const event_view &event_v, event_storage &buffer, materialization_type mode, + std::uint64_t offset) { + return generic_materialize( + event_v, buffer, mode, + [offset](materialization_type normalized_mode, + const event_updatable_view::write_proxy &proxy) { + const auto common_header_uv{proxy.get_common_header_updatable_view()}; + const auto total_size{std::size(proxy.get_updatable_portion())}; + if (normalized_mode != materialization_type::leave_checksum_as_is) { + common_header_uv.set_event_size_raw( + static_cast(total_size)); + } + common_header_uv.set_next_event_position_raw( + static_cast(offset + total_size)); + }); +} + +[[nodiscard]] event_updatable_view +rewriter::prepare_generic_materialize_internal(const event_view &event_v, + event_storage &buffer, + materialization_type &mode) { + // mode adjustments for cases when nothing has to be changed + if (mode == materialization_type::force_remove_checksum && + !event_v.has_footer()) { + mode = materialization_type::leave_checksum_as_is; + } + if (mode == materialization_type::force_add_checksum && + event_v.has_footer()) { + mode = materialization_type::leave_checksum_as_is; + } + + std::size_t destination_footer_size{}; + // source does not have checksum, destination should + switch (mode) { + case materialization_type::force_add_checksum: { + // source does not have checksum, destination should + assert(!event_v.has_footer()); + const auto event_size_with_footer{event_v.get_total_size() + + footer_view_base::size_in_bytes}; + const auto source_portion{event_v.get_portion()}; + buffer.reserve(event_size_with_footer); + buffer.assign(std::cbegin(source_portion), std::cend(source_portion)); + buffer.resize(event_size_with_footer); + destination_footer_size = footer_view_base::size_in_bytes; + } break; + case materialization_type::force_remove_checksum: { + // source has checksum, destination should not + assert(event_v.has_footer()); + const auto event_size_wo_footer{event_v.get_total_size() - + footer_view_base::size_in_bytes}; + const auto source_portion{ + event_v.get_portion().subspan(0U, event_size_wo_footer)}; + buffer.assign(std::cbegin(source_portion), std::cend(source_portion)); + destination_footer_size = 0U; + } break; + case materialization_type::leave_checksum_as_is: { + // either source has checksum and destination should or + // source does not have checksum and destination should not + const auto source_portion{event_v.get_portion()}; + buffer.assign(std::cbegin(source_portion), std::cend(source_portion)); + destination_footer_size = event_v.get_footer_size(); + } break; + } + const util::byte_span destination_portion{std::begin(buffer), + std::size(buffer)}; + event_updatable_view result{destination_portion, + event_v.get_post_header_size(), + destination_footer_size}; + return result; +} + +[[nodiscard]] event_updatable_view +rewriter::rewrite(seq_no_t last_local_sequence_number, + const event_view ¤t_event_v, + binsrv::events::event_storage &buffer, std::uint64_t offset) { + switch (current_event_v.get_common_header_view().get_type_code()) { + case code_type::gtid_log: + case code_type::anonymous_gtid_log: + return rewrite_gtid_log_internal(last_local_sequence_number, + current_event_v, buffer, offset); + case code_type::gtid_tagged_log: + return rewrite_gtid_tagged_log_internal(last_local_sequence_number, + current_event_v, buffer, offset); + break; + default: + return materialize_and_relocate(current_event_v, buffer, + materialization_type::force_add_checksum, + offset); + } +} + +void rewriter::fix_sequence_number_and_last_committed( + seq_no_t last_local_sequence_number, seq_no_t &remote_sequence_number, + seq_no_t &remote_last_committed) { + if (remote_last_committed >= remote_sequence_number) { + util::exception_location().raise( + "remote last_committed is greater than or equal to remote " + "sequence_number"); + } + // calculating delta between the sequence_number and last_committed in + // remote event post header + auto delta{remote_sequence_number - remote_last_committed}; + // sequence_number in the rewritten event must be set to the next number + // after last_local_sequence_number + remote_sequence_number = last_local_sequence_number + 1ULL; + // last_committed in the rewritten event must be set to the value which is + // 'delta' behind the new sequence_number, but not less than 0 + remote_last_committed = remote_sequence_number; + if (remote_last_committed > delta) { + remote_last_committed -= delta; + } else { + // only the very first event in the binlog file (which has sequence_number + // equal to 1) can have last_committed equal to 0, for all the other events + // last_committed must be at least 1 + remote_last_committed = (remote_sequence_number == 1ULL ? 0ULL : 1ULL); + } +} + +[[nodiscard]] event_updatable_view rewriter::rewrite_gtid_log_internal( + seq_no_t last_local_sequence_number, const event_view ¤t_event_v, + binsrv::events::event_storage &buffer, std::uint64_t offset) { + assert(current_event_v.get_common_header_view().get_type_code() == + code_type::gtid_log || + current_event_v.get_common_header_view().get_type_code() == + code_type::anonymous_gtid_log); + const auto gtid_log_fixer{ + [offset, last_local_sequence_number]( + materialization_type normalized_mode, + const event_updatable_view::write_proxy &proxy) { + const auto common_header_uv{proxy.get_common_header_updatable_view()}; + const auto total_size{std::size(proxy.get_updatable_portion())}; + + if (normalized_mode != materialization_type::leave_checksum_as_is) { + common_header_uv.set_event_size_raw( + static_cast(total_size)); + } + common_header_uv.set_next_event_position_raw( + static_cast(offset + total_size)); + + auto post_header_portion{proxy.get_post_header_updatable_raw()}; + // TODO: rework this with direct inplace updates when we implement + // post header updatable views (at least for GTID_LOG and + // ANONYMOUS_GTID_LOG events) + + // instantiating an instance of gtid_log_post_header (works for both + // GTID_LOG and ANONYMOUS_GTID_LOG) from the event post_header data span + gtid_log_post_header updated_gtid_log_post_header{post_header_portion}; + + // extracting and fixing the sequence_number and last_committed values + // based on the value of the last written sequence_number in the binlog + // (last_local_sequence_number) + auto remote_sequence_number{ + updated_gtid_log_post_header.get_sequence_number()}; + auto remote_last_committed{ + updated_gtid_log_post_header.get_last_committed()}; + fix_sequence_number_and_last_committed(last_local_sequence_number, + remote_sequence_number, + remote_last_committed); + updated_gtid_log_post_header.set_sequence_number( + remote_sequence_number); + updated_gtid_log_post_header.set_last_committed(remote_last_committed); + // serializing the updated post header back to the event post_header + // data span + updated_gtid_log_post_header.encode_to(post_header_portion); + }}; + return generic_materialize(current_event_v, buffer, + materialization_type::force_add_checksum, + gtid_log_fixer); +} + +[[nodiscard]] event_updatable_view rewriter::rewrite_gtid_tagged_log_internal( + seq_no_t last_local_sequence_number, const event_view ¤t_event_v, + binsrv::events::event_storage &buffer, std::uint64_t offset) { + assert(current_event_v.get_common_header_view().get_type_code() == + code_type::gtid_tagged_log); + + // the original size of the current event + const std::size_t original_event_size{current_event_v.get_total_size()}; + + generic_body_impl updated_gtid_tagged_log_body{ + current_event_v.get_body_raw()}; + // the original size of the transaction extracted from the event body + const std::uint64_t original_transaction_length{ + updated_gtid_tagged_log_body.get_transaction_length_raw()}; + if (original_transaction_length < original_event_size) { + util::exception_location().raise( + "transaction length in the gtid tagged log event is less than the " + "event size"); + } + + // the accumulated size of all the events in the transaction apart from the + // current one + const std::uint64_t transaction_tail_length{original_transaction_length - + original_event_size}; + + // extracting and fixing the sequence_number and last_committed values + // based on the value of the last written sequence_number in the binlog + // (last_local_sequence_number) + auto remote_sequence_number{ + updated_gtid_tagged_log_body.get_sequence_number()}; + auto remote_last_committed{updated_gtid_tagged_log_body.get_last_committed()}; + fix_sequence_number_and_last_committed(last_local_sequence_number, + remote_sequence_number, + remote_last_committed); + updated_gtid_tagged_log_body.set_sequence_number(remote_sequence_number); + updated_gtid_tagged_log_body.set_last_committed(remote_last_committed); + + // Because of the updated fields, which are serialized using variable-length + // encoding, the size of the event may change. We need to recalculate the + // event size taking into account that a change in the event size should also + // be reflected in the transaction_length field in the event body, which in + // turn may also change its size and so on. To handle this, we are + // recalculating the event size in a loop until it stabilizes (realistically, + // it should converge in 2 iterations max). + const auto event_size_calculator{ + [&updated_gtid_tagged_log_body, + transaction_tail_length](std::size_t event_size_candidate) { + // post_header size of the GTID_TAGGED_LOG event is zero (so not + // included in calculations) + updated_gtid_tagged_log_body.set_transaction_length_raw( + event_size_candidate + transaction_tail_length); + return default_common_header_length + + updated_gtid_tagged_log_body.calculate_encoded_size() + + default_footer_length; + }}; + std::size_t event_size_candidate{original_event_size}; + std::size_t recalculated_event_size{}; + std::size_t iteration{0U}; + static constexpr std::size_t max_number_of_iterations{9U}; + while (((recalculated_event_size = event_size_calculator( + event_size_candidate)) != event_size_candidate) && + (iteration < max_number_of_iterations)) { + event_size_candidate = recalculated_event_size; + ++iteration; + } + assert(iteration < max_number_of_iterations); + // After this loop both recalculated_event_size and event_size_candidate + // variables hold the final stabilized size of the rewritten event, which is + // consistent with the transaction_length field in the event body. Moreover, + // the transaction_length field in the event body has also been updated + // accordingly. + + // assembling the rewritten event in the provided buffer + // recalculated_event_size includes + // common_header size(19) + post_header size (0) + body size (variable) + + // footer size (4) + buffer.resize(recalculated_event_size); + // copying common_header from the original event to the buffer + std::ranges::copy(current_event_v.get_common_header_raw(), buffer.begin()); + // no need to copy post_header as its size is zero for GTID_TAGGED_LOG events + event_updatable_view result{buffer, 0U, default_footer_length}; + { + // creating a write proxy here so that the crc in the footer is recalculated + // at the end of this block when the proxy is destroyed + const auto write_proxy{result.get_write_proxy()}; + const auto common_header_uv{write_proxy.get_common_header_updatable_view()}; + // updating the event_size and next_event_position fields in the common + // header based on the recalculated event size + common_header_uv.set_event_size_raw( + static_cast(recalculated_event_size)); + common_header_uv.set_next_event_position_raw( + static_cast(offset + recalculated_event_size)); + + // encoding the updated body (with fixed sequence_number and last_committed + // values, and possibly updated transaction_length value) to the body part + // of the buffer + auto body_portion{write_proxy.get_body_updatable_raw()}; + updated_gtid_tagged_log_body.encode_to(body_portion); + } + return result; +} + +} // namespace binsrv::events diff --git a/src/binsrv/events/rewriter.hpp b/src/binsrv/events/rewriter.hpp new file mode 100644 index 0000000..747e98a --- /dev/null +++ b/src/binsrv/events/rewriter.hpp @@ -0,0 +1,128 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_EVENTS_REWRITER_HPP +#define BINSRV_EVENTS_REWRITER_HPP + +#include "binsrv/events/rewriter_fwd.hpp" // IWYU pragma: export + +#include + +#include "binsrv/events/common_types.hpp" +#include "binsrv/events/event_fwd.hpp" +#include "binsrv/events/event_view.hpp" + +namespace binsrv::events { + +class rewriter { +public: + // Generic materialization function that will create a copy of the event data + // from the 'event_v' and put it into the 'buffer' with the following + // adjustments: + // - if 'mode' is 'force_add_checksum', the materialized event will have a + // footer with a checksum even if the original event does not have it + // - if 'mode' is 'force_remove_checksum', the materialized event will not + // have a footer with a checksum even if the original event has it + // - if 'mode' is 'leave_checksum_as_is', the materialized event will have a + // footer with a checksum if and only if the original event has it. + // This function will also call a generic 'modification_functor' that allows + // to perform any additional modifications to the materialized event (e.g. + // change some fields in the common header, post header or body. The signature + // of the 'modification_functor' should be the following: + // void modify( + // materialization_type, + // const event_updatable_view::write_proxy + // ); + template + [[nodiscard]] static event_updatable_view + generic_materialize(const event_view &event_v, event_storage &buffer, + materialization_type mode, + const ModificationFunctor &modification_functor); + + // Simplified materialization functions for one of the most common use cases + // when no additional modifications are needed (only 'event_size' in the + // common header may be updated if the footer was added or removed). + [[nodiscard]] static event_updatable_view + materialize(const event_view &event_v, event_storage &buffer, + materialization_type mode); + + // Simplified materialization functions for one of the most common use cases + // when the event needs to be put into a datafile at the specified 'offset' + // (in this case 'next_event_position' in the common header will be updated + // and 'event_size' in the common header may be updated if the footer was + // added or removed). + [[nodiscard]] static event_updatable_view + materialize_and_relocate(const event_view &event_v, event_storage &buffer, + materialization_type mode, std::uint64_t offset); + + // This methods performs required adjustments for event relocation: + // - for all events it updates next_event_position field in the + // common header + // - for all events it adds a footer with the checksum and + // changes event_size in the common header if it is missing + // - for GTID_LOG and ANONYMOUS_GTID_LOG events it updates sequence_number + // and last_committed fields in the event post header + // - for GTID_TAGGED_LOG events it updates sequence_number and + // last_committed fields in the event body and may change + // transaction_length field in the event body and event size in the + // common header accordingly (because of variable length encoding of + // the event body elements) + [[nodiscard]] static event_updatable_view + rewrite(seq_no_t last_local_sequence_number, + const event_view ¤t_event_v, + binsrv::events::event_storage &buffer, std::uint64_t offset); + +private: + [[nodiscard]] static event_updatable_view + prepare_generic_materialize_internal(const event_view &event_v, + event_storage &buffer, + materialization_type &mode); + // fixes the sequence_number and last_committed values extracted from + // GTID_LOG and ANONYMOUS_GTID_LOG events based on the value of the last + // written sequence_number in the binlog (last_local_sequence_number) + static void + fix_sequence_number_and_last_committed(seq_no_t last_local_sequence_number, + seq_no_t &remote_sequence_number, + seq_no_t &remote_last_committed); + + [[nodiscard]] static event_updatable_view rewrite_gtid_log_internal( + seq_no_t last_local_sequence_number, const event_view ¤t_event_v, + binsrv::events::event_storage &buffer, std::uint64_t offset); + + [[nodiscard]] static event_updatable_view rewrite_gtid_tagged_log_internal( + seq_no_t last_local_sequence_number, const event_view ¤t_event_v, + binsrv::events::event_storage &buffer, std::uint64_t offset); +}; + +template +[[nodiscard]] event_updatable_view +rewriter::generic_materialize(const event_view &event_v, event_storage &buffer, + materialization_type mode, + const ModificationFunctor &modification_functor) { + // mode may be modified (normalized) by this call + const auto result{ + prepare_generic_materialize_internal(event_v, buffer, mode)}; + { + // we are creating a write_proxy here so that the checksum will be properly + // recalculated after all modifications + const auto write_proxy{result.get_write_proxy()}; + modification_functor(mode, write_proxy); + } + return result; +} + +} // namespace binsrv::events + +#endif // BINSRV_EVENTS_REWRITER_HPP diff --git a/src/binsrv/events/rewriter_fwd.hpp b/src/binsrv/events/rewriter_fwd.hpp new file mode 100644 index 0000000..2fccc05 --- /dev/null +++ b/src/binsrv/events/rewriter_fwd.hpp @@ -0,0 +1,33 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_EVENTS_REWRITER_FWD_HPP +#define BINSRV_EVENTS_REWRITER_FWD_HPP + +#include + +namespace binsrv::events { + +class rewriter; + +enum class materialization_type : std::uint8_t { + force_add_checksum, + force_remove_checksum, + leave_checksum_as_is +}; + +} // namespace binsrv::events + +#endif // BINSRV_EVENTS_REWRITER_FWD_HPP diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 9b15daf..13960e8 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -47,7 +47,7 @@ set(gtid_set_source_files ${PROJECT_SOURCE_DIR}/src/binsrv/gtids/gtid_set.cpp ) -set(event_set_source_files +set(event_source_files ${PROJECT_SOURCE_DIR}/src/binsrv/ctime_timestamp.cpp ${PROJECT_SOURCE_DIR}/src/binsrv/composite_binlog_name_fwd.hpp @@ -154,6 +154,10 @@ set(event_set_source_files ${PROJECT_SOURCE_DIR}/src/binsrv/events/reader_context.hpp ${PROJECT_SOURCE_DIR}/src/binsrv/events/reader_context.cpp + ${PROJECT_SOURCE_DIR}/src/binsrv/events/rewriter_fwd.hpp + ${PROJECT_SOURCE_DIR}/src/binsrv/events/rewriter.hpp + ${PROJECT_SOURCE_DIR}/src/binsrv/events/rewriter.cpp + ${PROJECT_SOURCE_DIR}/src/binsrv/events/rotate_body_impl_fwd.hpp ${PROJECT_SOURCE_DIR}/src/binsrv/events/rotate_body_impl.hpp ${PROJECT_SOURCE_DIR}/src/binsrv/events/rotate_body_impl.cpp @@ -243,7 +247,7 @@ set_target_properties(gtid_set_test PROPERTIES CXX_EXTENSIONS NO ) -add_executable(event_test event_test.cpp ${uuid_source_files} ${tag_source_files} ${gtid_source_files} ${gtid_set_source_files} ${event_set_source_files}) +add_executable(event_test event_test.cpp ${uuid_source_files} ${tag_source_files} ${gtid_source_files} ${gtid_set_source_files} ${event_source_files}) target_include_directories(event_test PRIVATE "${PROJECT_SOURCE_DIR}/src") target_link_libraries(event_test PRIVATE diff --git a/tests/event_test.cpp b/tests/event_test.cpp index 93e72bd..84a6602 100644 --- a/tests/event_test.cpp +++ b/tests/event_test.cpp @@ -44,6 +44,7 @@ #include "binsrv/events/gtid_log_post_header.hpp" #include "binsrv/events/protocol_traits_fwd.hpp" #include "binsrv/events/reader_context.hpp" +#include "binsrv/events/rewriter.hpp" #include "util/byte_span_fwd.hpp" #include "util/semantic_version.hpp" @@ -308,7 +309,7 @@ BOOST_AUTO_TEST_CASE(EventMaterialization) { // checking materialization of an event that has checksum const binsrv::events::event_view checksum_as_is_generated_v{ - binsrv::events::materialize( + binsrv::events::rewriter::materialize( generated_event_v, materialization_buffer, binsrv::events::materialization_type::leave_checksum_as_is)}; BOOST_CHECK(checksum_as_is_generated_v.has_footer()); @@ -321,7 +322,7 @@ BOOST_AUTO_TEST_CASE(EventMaterialization) { context_with_checksum, util::const_byte_span{materialization_buffer}}; const binsrv::events::event_view force_add_checksum_generated_v{ - binsrv::events::materialize( + binsrv::events::rewriter::materialize( generated_event_v, materialization_buffer, binsrv::events::materialization_type::force_add_checksum)}; BOOST_CHECK(force_add_checksum_generated_v.has_footer()); @@ -334,7 +335,7 @@ BOOST_AUTO_TEST_CASE(EventMaterialization) { context_with_checksum, util::const_byte_span{materialization_buffer}}; const binsrv::events::event_view force_remove_checksum_generated_v{ - binsrv::events::materialize( + binsrv::events::rewriter::materialize( generated_event_v, materialization_buffer, binsrv::events::materialization_type::force_remove_checksum)}; BOOST_CHECK(!force_remove_checksum_generated_v.has_footer()); @@ -351,7 +352,7 @@ BOOST_AUTO_TEST_CASE(EventMaterialization) { context_wo_checksum, util::const_byte_span{event_wo_footer_buffer}}; const binsrv::events::event_view checksum_as_is_copied_v{ - binsrv::events::materialize( + binsrv::events::rewriter::materialize( copied_event_v, materialization_buffer, binsrv::events::materialization_type::leave_checksum_as_is)}; BOOST_CHECK(!checksum_as_is_copied_v.has_footer()); @@ -362,7 +363,7 @@ BOOST_AUTO_TEST_CASE(EventMaterialization) { context_wo_checksum, util::const_byte_span{materialization_buffer}}; const binsrv::events::event_view force_add_checksum_copied_v{ - binsrv::events::materialize( + binsrv::events::rewriter::materialize( copied_event_v, materialization_buffer, binsrv::events::materialization_type::force_add_checksum)}; BOOST_CHECK(force_add_checksum_copied_v.has_footer()); @@ -375,7 +376,7 @@ BOOST_AUTO_TEST_CASE(EventMaterialization) { context_with_checksum, util::const_byte_span{materialization_buffer}}; const binsrv::events::event_view force_remove_checksum_copied_v{ - binsrv::events::materialize( + binsrv::events::rewriter::materialize( copied_event_v, materialization_buffer, binsrv::events::materialization_type::force_remove_checksum)}; BOOST_CHECK(!force_remove_checksum_copied_v.has_footer());