Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions components/core/src/clp_s/ffi/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
set(
CLP_S_FFI_SFA_SOURCES
sfa/ClpArchiveDecoder.cpp
sfa/ClpArchiveDecoder.hpp
sfa/SfaErrorCode.cpp
sfa/SfaErrorCode.hpp
sfa/ClpArchiveReader.cpp
sfa/ClpArchiveReader.hpp
sfa/LogEvent.hpp
)

if(CLP_BUILD_CLP_S_ARCHIVEREADER)
Expand Down
145 changes: 145 additions & 0 deletions components/core/src/clp_s/ffi/sfa/ClpArchiveDecoder.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#include "ClpArchiveDecoder.hpp"

#include <cstdint>
#include <exception>
#include <memory>
#include <new>
#include <optional>
#include <span>
#include <string>
#include <utility>

#include <spdlog/spdlog.h>
#include <ystdlib/error_handling/Result.hpp>

#include <clp_s/ArchiveReader.hpp>
#include <clp_s/Defs.hpp>
#include <clp_s/SchemaReader.hpp>

#include "ClpArchiveReader.hpp"
#include "LogEvent.hpp"
#include "SfaErrorCode.hpp"

namespace clp_s::ffi::sfa {
template <typename ReturnType>
using Result = ystdlib::error_handling::Result<ReturnType>;

auto ClpArchiveDecoder::create(ClpArchiveReader& reader) -> Result<ClpArchiveDecoder> {
try {
return ClpArchiveDecoder{
reader.m_archive_reader->read_all_tables(),
reader.m_archive_reader->has_log_order()
};
} catch (std::bad_alloc const&) {
SPDLOG_ERROR("Failed to create ClpArchiveDecoder: out of memory.");
return SfaErrorCode{SfaErrorCodeEnum::NoMemory};
} catch (std::exception const& ex) {
Comment on lines +33 to +36
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if catching bad_alloc here and in other functions, and having a dedicated SfaErrorCodeEnum::NoMemory is necessarily.

SPDLOG_ERROR("Exception while creating ClpArchiveDecoder: {}", ex.what());
return SfaErrorCode{SfaErrorCodeEnum::Failure};
}
}

ClpArchiveDecoder::ClpArchiveDecoder(ClpArchiveDecoder&& rhs) noexcept {
move_from(rhs);
}

auto ClpArchiveDecoder::operator=(ClpArchiveDecoder&& rhs) noexcept -> ClpArchiveDecoder& {
if (this == &rhs) {
return *this;
}

close();
move_from(rhs);
return *this;
}

ClpArchiveDecoder::~ClpArchiveDecoder() noexcept {
close();
}

auto ClpArchiveDecoder::close() noexcept -> void {
m_tables.clear();
m_log_events.clear();
m_has_log_order = false;
}

auto ClpArchiveDecoder::move_from(ClpArchiveDecoder& rhs) noexcept -> void {
m_tables = std::move(rhs.m_tables);
m_log_events = std::move(rhs.m_log_events);
m_has_log_order = std::exchange(rhs.m_has_log_order, false);
}

auto ClpArchiveDecoder::get_next_log_event() -> Result<std::optional<LogEvent>> {
try {
auto const has_next_log_event
= m_has_log_order ? decode_next_log_event_in_order() : decode_next_log_event();
if (false == has_next_log_event) {
return std::nullopt;
}

return m_log_events.back();
} catch (std::bad_alloc const&) {
SPDLOG_ERROR("Failed to decode log event in ClpArchiveDecoder: out of memory.");
return SfaErrorCode{SfaErrorCodeEnum::NoMemory};
} catch (std::exception const& ex) {
SPDLOG_ERROR("Exception while decoding log event in ClpArchiveDecoder: {}", ex.what());
return SfaErrorCode{SfaErrorCodeEnum::Failure};
}
}

auto ClpArchiveDecoder::collect_log_events() -> Result<std::span<LogEvent const>> {
while (YSTDLIB_ERROR_HANDLING_TRYX(get_next_log_event()).has_value()) {}

return std::span<LogEvent const>{m_log_events};
}

auto ClpArchiveDecoder::append_next_log_event(clp_s::SchemaReader& table) -> bool {
std::string message;
epochtime_t timestamp{0};
int64_t log_event_idx{0};

if (table.get_next_message_with_metadata(message, timestamp, log_event_idx)) {
m_log_events.emplace_back(message, timestamp, log_event_idx);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
m_log_events.emplace_back(message, timestamp, log_event_idx);
m_log_events.emplace_back(std::move(message), timestamp, log_event_idx);

return true;
}

return false;
}

auto ClpArchiveDecoder::decode_next_log_event() -> bool {
for (auto const& table : m_tables) {
if (table->done()) {
continue;
}

return append_next_log_event(*table);
}

return false;
}

auto ClpArchiveDecoder::decode_next_log_event_in_order() -> bool {
std::shared_ptr<clp_s::SchemaReader> next_table;
int64_t next_log_event_idx{0};
bool found_next_table{false};

for (auto const& table : m_tables) {
if (table->done()) {
continue;
}

auto const table_log_event_idx{table->get_next_log_event_idx()};
if (false == found_next_table || table_log_event_idx < next_log_event_idx) {
next_table = table;
next_log_event_idx = table_log_event_idx;
found_next_table = true;
}
}

if (false == found_next_table) {
return false;
}
Comment on lines +122 to +141
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::shared_ptr<clp_s::SchemaReader> next_table;
int64_t next_log_event_idx{0};
bool found_next_table{false};
for (auto const& table : m_tables) {
if (table->done()) {
continue;
}
auto const table_log_event_idx{table->get_next_log_event_idx()};
if (false == found_next_table || table_log_event_idx < next_log_event_idx) {
next_table = table;
next_log_event_idx = table_log_event_idx;
found_next_table = true;
}
}
if (false == found_next_table) {
return false;
}
std::shared_ptr<clp_s::SchemaReader> next_table;
int64_t next_log_event_idx{INT64_MAX};
for (auto const& table : m_tables) {
if (table->done()) {
continue;
}
auto const table_log_event_idx{table->get_next_log_event_idx()};
if (table_log_event_idx < next_log_event_idx) {
next_table = table;
next_log_event_idx = table_log_event_idx;
}
}
if (nullptr == next_table) {
return false;
}
  1. Does removing found_next_table make this function more understandable?
  2. The function looks correct to me but I'm not fully confident. Best to ask @gibber9809 to take a look.


return append_next_log_event(*next_table);
}
} // namespace clp_s::ffi::sfa
130 changes: 130 additions & 0 deletions components/core/src/clp_s/ffi/sfa/ClpArchiveDecoder.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#ifndef CLP_S_FFI_SFA_CLPARCHIVEDECODER_HPP
#define CLP_S_FFI_SFA_CLPARCHIVEDECODER_HPP

#include <memory>
#include <optional>
#include <span>
#include <utility>
#include <vector>

#include <ystdlib/error_handling/Result.hpp>

#include "LogEvent.hpp"

namespace clp_s {
// Forward declaration
class SchemaReader;
} // namespace clp_s

namespace clp_s::ffi::sfa {
class ClpArchiveReader;

/**
* Decoder that iterates over decoded log events from a `ClpArchiveReader`.
*/
class ClpArchiveDecoder {
public:
/**
* Creates a decoder from an initialized archive reader and preloads its schema tables.
*
* @param reader Initialized archive reader to decode log events from.
* @return A result containing the constructed `ClpArchiveDecoder` on success, or an error code
* if initialization fails:
* - `SfaErrorCodeEnum::Failure` if schema table loading fails.
* - `SfaErrorCodeEnum::NoMemory` if allocating decoder state fails.
*/
[[nodiscard]] static auto create(ClpArchiveReader& reader)
-> ystdlib::error_handling::Result<ClpArchiveDecoder>;

// Destructor
~ClpArchiveDecoder() noexcept;

// Delete copy constructor and assignment operator
ClpArchiveDecoder(ClpArchiveDecoder const&) = delete;
auto operator=(ClpArchiveDecoder const&) -> ClpArchiveDecoder& = delete;

// Move constructor and assignment operator
ClpArchiveDecoder(ClpArchiveDecoder&&) noexcept;
auto operator=(ClpArchiveDecoder&&) noexcept -> ClpArchiveDecoder&;

/**
* Decodes and returns the next log event.
*
* @return A result containing the next decoded `LogEvent`, or `std::nullopt` if all log events
* have already been consumed.
*/
[[nodiscard]] auto get_next_log_event()
-> ystdlib::error_handling::Result<std::optional<LogEvent>>;

/**
* Decodes all remaining log events and caches them in memory.
*
* Subsequent calls return the same cached decoded events without re decoding them.
*
* @return A result containing a span over all decoded log events, or an error code if decoding
* fails:
* - `SfaErrorCodeEnum::Failure` if decoding fails.
* - `SfaErrorCodeEnum::NoMemory` if memory allocation during decoding fails.
*/
[[nodiscard]] auto collect_log_events()
-> ystdlib::error_handling::Result<std::span<LogEvent const>>;

private:
// Constructor
/**
* Constructs a decoder from preloaded schema tables.
*
* @param tables Schema readers used to decode log events.
* @param has_log_order Whether log event ordering metadata is available.
*/
ClpArchiveDecoder(
std::vector<std::shared_ptr<clp_s::SchemaReader>>&& tables,
bool has_log_order
)
: m_tables{std::move(tables)},
m_has_log_order{has_log_order} {}

// Methods
/**
* Cleans up underlying resources.
*/
auto close() noexcept -> void;

/**
* Moves owned state from `rhs` into this object and resets `rhs`.
*
* @param rhs Object to move state from.
*/
auto move_from(ClpArchiveDecoder& rhs) noexcept -> void;

/**
* Decodes the next log event from `table` and appends it to the internal cache.
*
* @param table Schema reader to decode from.
* @return `true` if a log event was decoded and appended, or `false` if the table has no more
* log events.
*/
[[nodiscard]] auto append_next_log_event(clp_s::SchemaReader& table) -> bool;

/**
* Decodes the next available log event without enforcing global log event order.
*
* @return `true` if a log event was decoded, or `false` if no more log events remain.
*/
[[nodiscard]] auto decode_next_log_event() -> bool;

/**
* Decodes the next available log event in ascending log event index order.
*
* @return `true` if a log event was decoded, or `false` if no more log events remain.
*/
[[nodiscard]] auto decode_next_log_event_in_order() -> bool;

// Members
std::vector<std::shared_ptr<clp_s::SchemaReader>> m_tables;
bool m_has_log_order{false};
std::vector<LogEvent> m_log_events;
};
} // namespace clp_s::ffi::sfa

#endif // CLP_S_FFI_SFA_CLPARCHIVEDECODER_HPP
9 changes: 9 additions & 0 deletions components/core/src/clp_s/ffi/sfa/ClpArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <clp_s/ffi/sfa/SfaErrorCode.hpp>
#include <clp_s/InputConfig.hpp>

#include "ClpArchiveDecoder.hpp"

namespace clp_s::ffi::sfa {
template <typename ReturnType>
using Result = ystdlib::error_handling::Result<ReturnType>;
Expand Down Expand Up @@ -144,6 +146,13 @@ auto ClpArchiveReader::precompute_archive_metadata() -> Result<void> {
m_file_infos.emplace_back(filename, start_idx, end_idx);
}

m_archive_reader->read_dictionaries_and_metadata();
m_archive_reader->open_packed_streams();

return ystdlib::error_handling::success();
}

auto ClpArchiveReader::decode_all() -> Result<ClpArchiveDecoder> {
return ClpArchiveDecoder::create(*this);
}
} // namespace clp_s::ffi::sfa
16 changes: 15 additions & 1 deletion components/core/src/clp_s/ffi/sfa/ClpArchiveReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@

#include <ystdlib/error_handling/Result.hpp>

#include "ClpArchiveDecoder.hpp"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a forward declaration instead?


namespace clp_s {
// Forward include
// Forward declaration
class ArchiveReader;
} // namespace clp_s

Expand Down Expand Up @@ -101,7 +103,19 @@ class ClpArchiveReader {
*/
[[nodiscard]] auto get_file_infos() const -> std::vector<FileInfo> { return m_file_infos; }

/**
* Decodes all log events from the archive.
*
* @return A result containing the newly constructed `ClpArchiveDecoder` on success, or an
* error code indicating the failure:
* - Forwards `ClpArchiveDecoder::create`'s return values on failure.
*/
[[nodiscard]] auto decode_all() -> ystdlib::error_handling::Result<ClpArchiveDecoder>;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Is this function const?
  2. Is the argument of create const?
    [[nodiscard]] static auto create(ClpArchiveReader const& reader)
  3. Can we return
ClpArchiveDecoder{
                m_archive_reader->read_all_tables(),
                m_archive_reader->has_log_order()
}

inside auto decode_all() directly to avoid friend class ClpArchiveDecoder?


private:
// Allows `ClpArchiveDecoder` to reuse the native reader.
friend class ClpArchiveDecoder;

// Constructors
explicit ClpArchiveReader(
std::unique_ptr<clp_s::ArchiveReader> reader,
Expand Down
29 changes: 29 additions & 0 deletions components/core/src/clp_s/ffi/sfa/LogEvent.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#ifndef CLP_S_FFI_SFA_LOGEVENT_HPP
#define CLP_S_FFI_SFA_LOGEVENT_HPP

#include <cstdint>
#include <string>
#include <utility>

namespace clp_s::ffi::sfa {
class LogEvent {
public:
LogEvent(std::string message, int64_t timestamp, int64_t log_event_idx)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LogEvent(std::string message, int64_t timestamp, int64_t log_event_idx)
explicit LogEvent(std::string message, int64_t timestamp, int64_t log_event_idx)

: m_message{std::move(message)},
m_timestamp{timestamp},
m_log_event_idx{log_event_idx} {}

[[nodiscard]] auto get_message() const -> std::string const& { return m_message; }

[[nodiscard]] auto get_timestamp() const -> int64_t { return m_timestamp; }

[[nodiscard]] auto get_log_event_idx() const -> int64_t { return m_log_event_idx; }

private:
std::string m_message;
int64_t m_timestamp{0};
int64_t m_log_event_idx{0};
};
} // namespace clp_s::ffi::sfa

#endif // CLP_S_FFI_SFA_LOGEVENT_HPP
2 changes: 2 additions & 0 deletions components/core/src/clp_s/ffi/sfa/SfaErrorCode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ auto SfaErrorCategory::name() const noexcept -> char const* {
template <>
auto SfaErrorCategory::message(SfaErrorCodeEnum error_enum) const -> std::string {
switch (error_enum) {
case SfaErrorCodeEnum::Failure:
return "the operation failed";
case SfaErrorCodeEnum::IoFailure:
return "an I/O operation failed";
case SfaErrorCodeEnum::NoMemory:
Expand Down
1 change: 1 addition & 0 deletions components/core/src/clp_s/ffi/sfa/SfaErrorCode.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace clp_s::ffi::sfa {
* Error code enum for SFA API operations.
*/
enum class SfaErrorCodeEnum : uint8_t {
Failure,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

Suggested change
Failure,
DecodeFailure,

IoFailure,
NoMemory,
NotInit,
Expand Down
Loading