Skip to content
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions worker/fuzzer/src/RTC/FuzzerRtpPacket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,7 @@ void Fuzzer::RTC::RtpPacket::Fuzz(const uint8_t* data, size_t len)
packet->GetPayloadPadding();
packet->IsKeyFrame();

auto* clonedPacket = packet->Clone();

delete clonedPacket;
auto clonedPacket = packet->Clone();

// TODO: packet->RtxEncode(); // This cannot be tested this way.
// TODO: packet->RtxDecode(); // This cannot be tested this way.
Expand All @@ -190,5 +188,5 @@ void Fuzzer::RTC::RtpPacket::Fuzz(const uint8_t* data, size_t len)
// TODO: packet->ProcessPayload();
// TODO: packet->ShiftPayload();

delete packet;
::RTC::RtpPacket::Deallocate(packet);
}
3 changes: 2 additions & 1 deletion worker/fuzzer/src/RTC/FuzzerRtpStreamSend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,6 @@ void Fuzzer::RTC::RtpStreamSend::Fuzz(const uint8_t* data, size_t len)
}

delete stream;
delete packet;

::RTC::RtpPacket::Deallocate(packet);
}
3 changes: 2 additions & 1 deletion worker/include/RTC/DirectTransport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ namespace RTC
void SendRtpPacket(
RTC::Consumer* consumer,
RTC::RtpPacket* packet,
RTC::Transport::onSendCallback* cb = nullptr) override;
RTC::Transport::onSendCallback* cb = nullptr,
RTC::Transport::OnSendCallbackCtx* ctx = nullptr) override;
void SendRtcpPacket(RTC::RTCP::Packet* packet) override;
void SendRtcpCompoundPacket(RTC::RTCP::CompoundPacket* packet) override;
void SendMessage(
Expand Down
3 changes: 2 additions & 1 deletion worker/include/RTC/PipeTransport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ namespace RTC
void SendRtpPacket(
RTC::Consumer* consumer,
RTC::RtpPacket* packet,
RTC::Transport::onSendCallback* cb = nullptr) override;
RTC::Transport::onSendCallback* cb = nullptr,
RTC::Transport::OnSendCallbackCtx* ctx = nullptr) override;
void SendRtcpPacket(RTC::RTCP::Packet* packet) override;
void SendRtcpCompoundPacket(RTC::RTCP::CompoundPacket* packet) override;
void SendMessage(
Expand Down
3 changes: 2 additions & 1 deletion worker/include/RTC/PlainTransport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ namespace RTC
void SendRtpPacket(
RTC::Consumer* consumer,
RTC::RtpPacket* packet,
RTC::Transport::onSendCallback* cb = nullptr) override;
RTC::Transport::onSendCallback* cb = nullptr,
RTC::Transport::OnSendCallbackCtx* ctx = nullptr) override;
void SendRtcpPacket(RTC::RTCP::Packet* packet) override;
void SendRtcpCompoundPacket(RTC::RTCP::CompoundPacket* packet) override;
void SendMessage(
Expand Down
16 changes: 12 additions & 4 deletions worker/include/RTC/RtpPacket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ namespace RTC
class RtpPacket
{
public:
using Allocator = Utils::ObjectPoolAllocator<RtpPacket>;
using AllocatorTraits = std::allocator_traits<Allocator>;
// Memory to hold the cloned packet (with extra space for RTX encoding).
using RtpPacketBuffer = std::array<uint8_t, MtuSize + 100>;
using BufferAllocator = Utils::ObjectPoolAllocator<RtpPacket::RtpPacketBuffer>;
using BufferAllocatorTraits = std::allocator_traits<BufferAllocator>;

static void Deallocate(RtpPacket* packet);

/* Struct for RTP header. */
struct Header
{
Expand Down Expand Up @@ -133,7 +142,7 @@ namespace RTC

static RtpPacket* Parse(const uint8_t* data, size_t len);

private:
public:
Comment thread
jmillan marked this conversation as resolved.
RtpPacket(
Header* header,
HeaderExtension* headerExtension,
Expand All @@ -142,7 +151,6 @@ namespace RTC
uint8_t payloadPadding,
size_t size);

public:
~RtpPacket();

void Dump() const;
Expand Down Expand Up @@ -589,7 +597,7 @@ namespace RTC
return this->payloadDescriptorHandler->IsKeyFrame();
}

RtpPacket* Clone() const;
std::shared_ptr<RtpPacket> Clone() const;

void RtxEncode(uint8_t payloadType, uint32_t ssrc, uint16_t seq);

Expand Down Expand Up @@ -635,7 +643,7 @@ namespace RTC
std::shared_ptr<Codecs::PayloadDescriptorHandler> payloadDescriptorHandler;
// Buffer where this packet is allocated, can be `nullptr` if packet was
// parsed from externally provided buffer.
uint8_t* buffer{ nullptr };
RtpPacketBuffer* buffer{ nullptr };
};
} // namespace RTC

Expand Down
3 changes: 3 additions & 0 deletions worker/include/RTC/RtpStreamSend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ namespace RTC
public:
struct StorageItem
{
using Allocator = Utils::ObjectPoolAllocator<RtpStreamSend::StorageItem>;
using AllocatorTraits = std::allocator_traits<Allocator>;

void Reset();

// Original packet.
Expand Down
6 changes: 5 additions & 1 deletion worker/include/RTC/TcpConnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ namespace RTC
~TcpConnection() override;

public:
void Send(const uint8_t* data, size_t len, ::TcpConnectionHandler::onSendCallback* cb);
void Send(
const uint8_t* data,
size_t len,
RTC::Transport::onSendCallback* cb,
RTC::Transport::OnSendCallbackCtx* ctx);

/* Pure virtual methods inherited from ::TcpConnectionHandler. */
public:
Expand Down
29 changes: 27 additions & 2 deletions worker/include/RTC/Transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,31 @@ namespace RTC
public Timer::Listener
{
protected:
using onSendCallback = const std::function<void(bool sent)>;
using onQueuedCallback = const std::function<void(bool queued, bool sctpSendBufferFull)>;

public:
#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
struct OnSendCallbackCtx
{
RTC::TransportCongestionControlClient* tccClient;
webrtc::RtpPacketSendInfo packetInfo;
RTC::SenderBandwidthEstimator* senderBwe;
RTC::SenderBandwidthEstimator::SentInfo sentInfo;
};
#else
struct OnSendCallbackCtx
{
using Allocator = Utils::ObjectPoolAllocator<Transport::OnSendCallbackCtx>;
using AllocatorTraits = std::allocator_traits<Allocator>;

RTC::TransportCongestionControlClient* tccClient;
webrtc::RtpPacketSendInfo packetInfo;
};
#endif
// This function MUST NOT be de-allocated manually and MUST be called EXACTLY once.
static void OnSendCallback(bool sent, OnSendCallbackCtx* ctx);
using onSendCallback = void(bool sent, OnSendCallbackCtx* ctx);

public:
class Listener
{
Expand Down Expand Up @@ -170,7 +192,10 @@ namespace RTC
private:
virtual bool IsConnected() const = 0;
virtual void SendRtpPacket(
RTC::Consumer* consumer, RTC::RtpPacket* packet, onSendCallback* cb = nullptr) = 0;
RTC::Consumer* consumer,
RTC::RtpPacket* packet,
onSendCallback* cb = nullptr,
OnSendCallbackCtx* ctx = nullptr) = 0;
void HandleRtcpPacket(RTC::RTCP::Packet* packet);
void SendRtcp(uint64_t nowMs);
virtual void SendRtcpPacket(RTC::RTCP::Packet* packet) = 0;
Expand Down
14 changes: 8 additions & 6 deletions worker/include/RTC/TransportTuple.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "common.hpp"
#include "Utils.hpp"
#include "RTC/TcpConnection.hpp"
#include "RTC/Transport.hpp"
#include "RTC/UdpSocket.hpp"
#include <nlohmann/json.hpp>
#include <string>
Expand All @@ -14,9 +15,6 @@ namespace RTC
{
class TransportTuple
{
protected:
using onSendCallback = const std::function<void(bool sent)>;

public:
enum class Protocol
{
Expand Down Expand Up @@ -85,12 +83,16 @@ namespace RTC
this->localAnnouncedIp = localAnnouncedIp;
}

void Send(const uint8_t* data, size_t len, RTC::TransportTuple::onSendCallback* cb = nullptr)
void Send(
const uint8_t* data,
size_t len,
Transport::onSendCallback* cb = nullptr,
Transport::OnSendCallbackCtx* ctx = nullptr)
{
if (this->protocol == Protocol::UDP)
this->udpSocket->Send(data, len, this->udpRemoteAddr, cb);
this->udpSocket->Send(data, len, this->udpRemoteAddr, cb, ctx);
else
this->tcpConnection->Send(data, len, cb);
this->tcpConnection->Send(data, len, cb, ctx);
}

Protocol GetProtocol() const
Expand Down
3 changes: 2 additions & 1 deletion worker/include/RTC/WebRtcTransport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ namespace RTC
void SendRtpPacket(
RTC::Consumer* consumer,
RTC::RtpPacket* packet,
RTC::Transport::onSendCallback* cb = nullptr) override;
RTC::Transport::onSendCallback* cb = nullptr,
RTC::Transport::OnSendCallbackCtx* ctx = nullptr) override;
void SendRtcpPacket(RTC::RTCP::Packet* packet) override;
void SendRtcpCompoundPacket(RTC::RTCP::CompoundPacket* packet) override;
void SendMessage(
Expand Down
81 changes: 81 additions & 0 deletions worker/include/Utils.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
#ifndef MS_UTILS_HPP
#define MS_UTILS_HPP

#ifndef MS_CLASS
#define MS_CLASS "Utils"
Comment thread
jmillan marked this conversation as resolved.
Outdated
#endif

// #define MS_MEM_POOL_FREE_ON_RETURN 1

#include "common.hpp"
#include "Logger.hpp"
#include <openssl/evp.h>
#include <cmath>
#include <cstring> // std::memcmp(), std::memcpy()
Expand Down Expand Up @@ -353,6 +360,80 @@ namespace Utils
return false;
}
};

// Simple implementation of object pool only for single objects.
// Arrays are allocated as usual.
template<typename T>
class ObjectPoolAllocator
{
std::shared_ptr<std::vector<T*>> pool_data;

public:
typedef T value_type;
thread_local static Utils::ObjectPoolAllocator<T> Pool;

ObjectPoolAllocator()
{
pool_data = std::shared_ptr<std::vector<T*>>(
new std::vector<T*>(),
[](std::vector<T*>* pool)
{
for (auto* ptr : *pool)
{
std::free(ptr);
}
delete pool;
});
}

template<typename U>
ObjectPoolAllocator(const ObjectPoolAllocator<U>& other)
: pool_data(ObjectPoolAllocator<T>::Pool.pool_data)
{
}

~ObjectPoolAllocator()
{
}

T* allocate(size_t n)
{
MS_ASSERT(n == 1, "only single object can be allocated");

if (this->pool_data->empty())
{
return static_cast<T*>(std::malloc(sizeof(T)));
}

T* ptr = this->pool_data->back();
this->pool_data->pop_back();

return ptr;
}

void deallocate(T* ptr, size_t n)
{
if (!ptr)
{
return;
}

if (n > 1)
{
std::free(ptr);
return;
}

#ifdef MS_MEM_POOL_FREE_ON_RETURN
std::free(ptr);
#else
this->pool_data->push_back(ptr);
#endif
}
};

template<typename T>
thread_local Utils::ObjectPoolAllocator<T> Utils::ObjectPoolAllocator<T>::Pool;
} // namespace Utils

#endif
10 changes: 6 additions & 4 deletions worker/include/handles/TcpConnectionHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define MS_TCP_CONNECTION_HPP

#include "common.hpp"
#include "RTC/Transport.hpp"
#include <uv.h>
#include <string>

Expand Down Expand Up @@ -35,12 +36,12 @@ class TcpConnectionHandler
~UvWriteData()
{
delete[] this->store;
delete this->cb;
}

uv_write_t req;
uint8_t* store{ nullptr };
TcpConnectionHandler::onSendCallback* cb{ nullptr };
RTC::Transport::onSendCallback* cb{ nullptr };
RTC::Transport::OnSendCallbackCtx* ctx{ nullptr };
};

public:
Expand Down Expand Up @@ -71,7 +72,8 @@ class TcpConnectionHandler
size_t len1,
const uint8_t* data2,
size_t len2,
TcpConnectionHandler::onSendCallback* cb);
RTC::Transport::onSendCallback* cb,
RTC::Transport::OnSendCallbackCtx* ctx);
void ErrorReceiving();
const struct sockaddr* GetLocalAddress() const
{
Expand Down Expand Up @@ -117,7 +119,7 @@ class TcpConnectionHandler
public:
void OnUvReadAlloc(size_t suggestedSize, uv_buf_t* buf);
void OnUvRead(ssize_t nread, const uv_buf_t* buf);
void OnUvWrite(int status, onSendCallback* cb);
void OnUvWrite(int status, RTC::Transport::onSendCallback* cb, RTC::Transport::OnSendCallbackCtx* ctx);

/* Pure virtual methods that must be implemented by the subclass. */
protected:
Expand Down
Loading