Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ set(source_files
src/binsrv/events/reader_context.hpp
src/binsrv/events/reader_context.cpp

src/binsrv/events/rewriter_fwd.hpp
src/binsrv/events/rewriter.hpp
src/binsrv/events/rewriter.cpp

src/binsrv/events/rotate_body_impl_fwd.hpp
src/binsrv/events/rotate_body_impl.hpp
src/binsrv/events/rotate_body_impl.cpp
Expand Down
14 changes: 4 additions & 10 deletions src/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
#include "binsrv/events/event_view.hpp"
#include "binsrv/events/protocol_traits_fwd.hpp"
#include "binsrv/events/reader_context.hpp"
#include "binsrv/events/rewriter.hpp"

#include "easymysql/connection.hpp"
#include "easymysql/connection_config.hpp"
Expand Down Expand Up @@ -756,16 +757,9 @@ void rewrite_and_process_binlog_event(
// in rewrite mode we need to update next_event_position (and optional
// checksum in the footer) in the received event data portion
binsrv::events::event_storage buffer{};
const auto event_copy_uv{binsrv::events::materialize(
current_event_v, buffer,
binsrv::events::materialization_type::force_add_checksum)};
{
// TODO: optimize redundant checksum recalculation
const auto proxy{event_copy_uv.get_write_proxy()};
proxy.get_common_header_updatable_view().set_next_event_position_raw(
static_cast<std::uint32_t>(storage.get_current_position() +
event_copy_uv.get_total_size()));
}
const auto event_copy_uv{binsrv::events::rewriter::rewrite(
storage.get_last_transaction_sequence_number(), current_event_v, buffer,
storage.get_current_position())};
process_binlog_event(event_copy_uv, logger, context, storage);
}

Expand Down
69 changes: 0 additions & 69 deletions src/binsrv/events/event_view.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

#include "binsrv/events/code_type.hpp"
#include "binsrv/events/common_header_view.hpp"
#include "binsrv/events/event_fwd.hpp"
#include "binsrv/events/footer_view.hpp"
// needed for extracting info from the FDE body
#include "binsrv/events/format_description_body_impl.hpp" // IWYU pragma: keep
Expand Down Expand Up @@ -166,74 +165,6 @@ event_view_base::get_footer_updatable_view() const {
return footer_updatable_view{get_footer_updatable_raw()};
}

[[nodiscard]] event_updatable_view materialize(const event_view &event_v,
event_storage &buffer,
materialization_type mode) {
// mode adjustments for cases when nothing has to be changed
if (mode == materialization_type::force_remove_checksum &&
!event_v.has_footer()) {
mode = materialization_type::leave_checksum_as_is;
}
if (mode == materialization_type::force_add_checksum &&
event_v.has_footer()) {
mode = materialization_type::leave_checksum_as_is;
}

// source does not have checksum, destination should
if (mode == materialization_type::force_add_checksum) {
assert(!event_v.has_footer());
const auto event_size_with_footer{event_v.get_total_size() +
footer_view_base::size_in_bytes};
const auto source_portion{event_v.get_portion()};
buffer.reserve(event_size_with_footer);
buffer.assign(std::cbegin(source_portion), std::cend(source_portion));
buffer.resize(event_size_with_footer);
const util::byte_span destination_portion{std::begin(buffer),
std::size(buffer)};
event_updatable_view result{destination_portion,
event_v.get_post_header_size(),
footer_view_base::size_in_bytes};
{
const auto write_proxy{result.get_write_proxy()};
write_proxy.get_common_header_updatable_view().set_event_size_raw(
static_cast<std::uint32_t>(event_size_with_footer));
}
return result;
}

// source has checksum, destination should not
if (mode == materialization_type::force_remove_checksum) {
assert(event_v.has_footer());
const auto event_size_wo_footer{event_v.get_total_size() -
footer_view_base::size_in_bytes};
const auto source_portion{
event_v.get_portion().subspan(0U, event_size_wo_footer)};
buffer.assign(std::cbegin(source_portion), std::cend(source_portion));
const util::byte_span destination_portion{std::begin(buffer),
std::size(buffer)};
event_updatable_view result{destination_portion,
event_v.get_post_header_size(), 0U};
{
const auto write_proxy{result.get_write_proxy()};
write_proxy.get_common_header_updatable_view().set_event_size_raw(
static_cast<std::uint32_t>(event_size_wo_footer));
}
return result;
}

// either source has checksum and destination should or
// source does not have checksum and destination should not
assert(mode == materialization_type::leave_checksum_as_is);

const auto source_portion{event_v.get_portion()};
buffer.assign(std::cbegin(source_portion), std::cend(source_portion));
const util::byte_span destination_portion{std::begin(buffer),
std::size(buffer)};
return event_updatable_view{destination_portion,
event_v.get_post_header_size(),
event_v.get_footer_size()};
}

std::ostream &operator<<(std::ostream &output, const event_view &obj) {
return output << "common header: " << event_view::get_common_header_size()
<< " byte(s), post header: " << obj.get_post_header_size()
Expand Down
5 changes: 2 additions & 3 deletions src/binsrv/events/event_view.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "binsrv/events/footer_view.hpp" // IWYU pragma: export
#include "binsrv/events/protocol_traits_fwd.hpp"
#include "binsrv/events/reader_context_fwd.hpp"
#include "binsrv/events/rewriter_fwd.hpp"

#include "util/byte_span_fwd.hpp"

Expand Down Expand Up @@ -124,9 +125,7 @@ class [[nodiscard]] event_view_base {

class [[nodiscard]] event_updatable_view : private event_view_base {
friend class event_view;
friend event_updatable_view materialize(const event_view &event_v,
event_storage &buffer,
materialization_type mode);
friend class rewriter;

public:
// an auxiliary proxy class that allows to update the content of an
Expand Down
9 changes: 0 additions & 9 deletions src/binsrv/events/event_view_fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,6 @@ namespace binsrv::events {
class event_updatable_view;
class event_view;

enum class materialization_type : std::uint8_t {
force_add_checksum,
force_remove_checksum,
leave_checksum_as_is
};
[[nodiscard]] event_updatable_view materialize(const event_view &event_v,
event_storage &buffer,
materialization_type mode);

std::ostream &operator<<(std::ostream &output, const event_view &obj);

} // namespace binsrv::events
Expand Down
6 changes: 6 additions & 0 deletions src/binsrv/events/gtid_log_post_header.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ class [[nodiscard]] gtid_log_post_header {
void set_last_committed_raw(std::int64_t last_committed) noexcept {
last_committed_ = last_committed;
}
void set_last_committed(events::seq_no_t last_committed) noexcept {
last_committed_ = static_cast<std::int64_t>(last_committed);
}

[[nodiscard]] std::int64_t get_sequence_number_raw() const noexcept {
return sequence_number_;
Expand All @@ -86,6 +89,9 @@ class [[nodiscard]] gtid_log_post_header {
void set_sequence_number_raw(std::int64_t sequence_number) noexcept {
sequence_number_ = sequence_number;
}
void set_sequence_number(events::seq_no_t sequence_number) noexcept {
sequence_number_ = static_cast<std::int64_t>(sequence_number);
}

[[nodiscard]] static std::size_t calculate_encoded_size() noexcept {
return size_in_bytes;
Expand Down
4 changes: 2 additions & 2 deletions src/binsrv/events/gtid_tagged_log_body_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ generic_body_impl<code_type::gtid_tagged_log>::generic_body_impl(

// https://github.com/mysql/mysql-server/blob/mysql-8.4.6/libs/mysql/serialization/readme.md?plain=1#L113
// <message_format> ::= <serializable_field_size>
// <last_non_ignorable_field_id> { <type_field> } Extracting
// <serializable_field_size>
// <last_non_ignorable_field_id> { <type_field> }
// Extracting <serializable_field_size>
std::size_t serializable_field_size{};
if (!util::extract_varlen_int_from_byte_span_checked(
remainder, serializable_field_size)) {
Expand Down
6 changes: 6 additions & 0 deletions src/binsrv/events/gtid_tagged_log_body_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ template <> class [[nodiscard]] generic_body_impl<code_type::gtid_tagged_log> {
void set_last_committed_raw(std::int64_t last_committed) noexcept {
last_committed_ = last_committed;
}
void set_last_committed(events::seq_no_t last_committed) noexcept {
last_committed_ = static_cast<std::int64_t>(last_committed);
}

[[nodiscard]] std::int64_t get_sequence_number_raw() const noexcept {
return sequence_number_;
Expand All @@ -97,6 +100,9 @@ template <> class [[nodiscard]] generic_body_impl<code_type::gtid_tagged_log> {
void set_sequence_number_raw(std::int64_t sequence_number) noexcept {
sequence_number_ = sequence_number;
}
void set_sequence_number(events::seq_no_t sequence_number) noexcept {
sequence_number_ = static_cast<std::int64_t>(sequence_number);
}

[[nodiscard]] std::uint64_t
get_immediate_commit_timestamp_raw() const noexcept {
Expand Down
Loading
Loading