Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
be961ea
Worker: New Consumer class handling all possible producer stream modes.
jmillan Feb 11, 2026
0caec34
Use qualified names everywhere
jmillan Feb 16, 2026
f30ee9f
Remove unused GetRtpStreams() method
jmillan Feb 17, 2026
6ea2714
Fix: call packet->logger.Discarded() with the proper reason
jmillan Feb 17, 2026
b185abf
Fix: Restore IsActive() logic
jmillan Feb 17, 2026
a6d0a0b
Tests: TestSimpleProducerStreamManager
jmillan Feb 18, 2026
38d121c
clang-tidy fixes
jmillan Feb 18, 2026
7039bcd
Enhance tests, use Factory() to create RTP Packet
jmillan Feb 18, 2026
1ba9d01
feedback
jmillan Feb 18, 2026
d4e26bd
Tests: TestSimulcastProducerStreamManager
jmillan Feb 19, 2026
2e9eead
Merge branch 'v3' into new_consumer
ibc Feb 24, 2026
6f46922
Merge remote-tracking branch 'origin/v3' into new_consumer
jmillan Mar 20, 2026
f4b97b5
TestSvcProducerStreamManager
jmillan Mar 20, 2026
0b51fdd
clang-tidy
jmillan Mar 20, 2026
bf6d221
make format
jmillan Mar 20, 2026
8d61406
Update worker/src/RTC/SimulcastProducerStreamManager.cpp
jmillan Mar 20, 2026
d72b366
Update worker/src/RTC/SimulcastProducerStreamManager.cpp
jmillan Mar 20, 2026
5f765fc
Update worker/src/RTC/SvcProducerStreamManager.cpp
jmillan Mar 20, 2026
3e9ac66
Update worker/src/RTC/SvcProducerStreamManager.cpp
jmillan Mar 20, 2026
6e60816
Update worker/src/RTC/SvcProducerStreamManager.cpp
jmillan Mar 20, 2026
971ac41
format
jmillan Mar 20, 2026
2785458
tidy
jmillan Mar 20, 2026
0086b99
Do not make listener const
jmillan Mar 20, 2026
52271fd
cosmetic
jmillan Mar 20, 2026
6b079f2
cosmetic
jmillan Mar 20, 2026
cd9b84c
WIP Consumer pipe-able
jmillan Mar 23, 2026
feb84bb
cosmetic
jmillan Mar 23, 2026
973d1fd
Merge remote-tracking branch 'origin/v3' into new_consumer
jmillan Mar 23, 2026
d9113dd
Merge remote-tracking branch 'origin/v3' into new_consumer
jmillan May 15, 2026
f70f31e
Adapt to Shared rework
jmillan May 15, 2026
ef73cff
fix tests after merge
jmillan May 15, 2026
bfb56eb
more Shared related changes
jmillan May 15, 2026
0d8100b
Fixes
jmillan May 15, 2026
92fd229
PipeProducerStreamManager
jmillan May 15, 2026
51cb67e
format
jmillan May 15, 2026
ef22ed4
fix compilation
jmillan May 18, 2026
91b3550
tidy
jmillan May 18, 2026
8b689d4
remove unused imports
jmillan May 18, 2026
c7bc385
revert local change
jmillan May 18, 2026
5d3cd88
.clang-format: remove unknown key BreakAfterOpenBracketBracedList
jmillan May 18, 2026
f72eda3
tidy
jmillan May 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions worker/.clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ AlwaysBreakBeforeMultilineStrings: true
AlwaysBreakTemplateDeclarations: Yes
BinPackArguments: false
BinPackParameters: false
# NOTE: This doesn't do anything because it requires Cpp11BracedListStyle: true,
# which we don't want.
BreakAfterOpenBracketBracedList: true
BraceWrapping:
AfterClass: true
AfterControlStatement: Always
Expand Down
125 changes: 80 additions & 45 deletions worker/include/RTC/Consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
#define MS_RTC_CONSUMER_HPP

#include "common.hpp"
#include "Shared.hpp"
#include "SharedInterface.hpp"
#include "Channel/ChannelRequest.hpp"
#include "Channel/ChannelSocket.hpp"
#include "FBS/consumer.h"
#include "FBS/transport.h"
#include "RTC/ConsumerTypes.hpp"
#include "RTC/ProducerStreamManager.hpp"
#include "RTC/RTCP/CompoundPacket.hpp"
#include "RTC/RTCP/FeedbackRtpNack.hpp"
#include "RTC/RTCP/ReceiverReport.hpp"
Expand All @@ -17,14 +19,22 @@
#include "RTC/RTP/RtpStreamSend.hpp"
#include "RTC/RTP/SharedPacket.hpp"
#include "RTC/RtpDictionaries.hpp"
#include "RTC/SeqManager.hpp"
#include <absl/container/flat_hash_set.h>
#include <map>
#include <memory>
#include <string>
#include <vector>

namespace RTC
{
class Consumer : public Channel::ChannelSocket::RequestHandler
class Consumer : public Channel::ChannelSocket::RequestHandler,
public RTC::RTP::RtpStreamSend::Listener,
Comment thread
ibc marked this conversation as resolved.
public RTC::ProducerStreamManager::Listener
{
using RetransmissionBuffer =
std::map<uint16_t, RTC::RTP::SharedPacket, RTC::SeqManager<uint16_t>::SeqLowerThan>;

public:
class Listener
{
Expand Down Expand Up @@ -56,20 +66,18 @@ namespace RTC
const std::string& id,
const std::string& producerId,
RTC::Consumer::Listener* listener,
const FBS::Transport::ConsumeRequest* data,
RTC::RtpParameters::Type type);
const FBS::Transport::ConsumeRequest* data);
~Consumer() override;

public:
flatbuffers::Offset<FBS::Consumer::BaseConsumerDump> FillBuffer(
flatbuffers::Offset<FBS::Consumer::DumpResponse> FillBuffer(
flatbuffers::FlatBufferBuilder& builder) const;
flatbuffers::Offset<FBS::Consumer::BaseConsumerDump> FillBufferBase(
flatbuffers::FlatBufferBuilder& builder) const;
flatbuffers::Offset<FBS::Consumer::GetStatsResponse> FillBufferStats(
flatbuffers::FlatBufferBuilder& builder);
flatbuffers::Offset<FBS::Consumer::ConsumerScore> FillBufferScore(
flatbuffers::FlatBufferBuilder& builder) const;
virtual flatbuffers::Offset<FBS::Consumer::GetStatsResponse> FillBufferStats(
flatbuffers::FlatBufferBuilder& builder) = 0;
virtual flatbuffers::Offset<FBS::Consumer::ConsumerScore> FillBufferScore(
flatbuffers::FlatBufferBuilder& /*builder*/) const
{
return 0;
};
RTC::Media::Kind GetKind() const
{
return this->kind;
Expand All @@ -86,12 +94,9 @@ namespace RTC
{
return this->type;
}
virtual RTC::ConsumerTypes::VideoLayers GetPreferredLayers() const
RTC::ConsumerTypes::VideoLayers GetPreferredLayers() const
{
// By default return 1:1.
RTC::ConsumerTypes::VideoLayers layers;

return layers;
return this->producerStreamManager->GetPreferredLayers();
}
const std::vector<uint32_t>& GetMediaSsrcs() const
{
Expand All @@ -101,10 +106,8 @@ namespace RTC
{
return this->rtxSsrcs;
}
virtual bool IsActive() const
bool IsActive() const override
{
// The parent Consumer just checks whether Consumer and Producer are
// not paused and the transport connected.
// clang-format off
return (
this->transportConnected &&
Expand All @@ -126,32 +129,30 @@ namespace RTC
}
void ProducerPaused();
void ProducerResumed();
virtual void ProducerRtpStream(RTC::RTP::RtpStreamRecv* rtpStream, uint32_t mappedSsrc) = 0;
virtual void ProducerNewRtpStream(RTC::RTP::RtpStreamRecv* rtpStream, uint32_t mappedSsrc) = 0;
void ProducerRtpStream(RTC::RTP::RtpStreamRecv* rtpStream, uint32_t mappedSsrc);
void ProducerNewRtpStream(RTC::RTP::RtpStreamRecv* rtpStream, uint32_t mappedSsrc);
void ProducerRtpStreamScores(const std::vector<uint8_t>* scores);
virtual void ProducerRtpStreamScore(
RTC::RTP::RtpStreamRecv* rtpStream, uint8_t score, uint8_t previousScore) = 0;
virtual void ProducerRtcpSenderReport(RTC::RTP::RtpStreamRecv* rtpStream, bool first) = 0;
void ProducerRtpStreamScore(RTC::RTP::RtpStreamRecv* rtpStream, uint8_t score, uint8_t previousScore);
void ProducerRtcpSenderReport(RTC::RTP::RtpStreamRecv* rtpStream, bool first);
void ProducerClosed();
void SetExternallyManagedBitrate()
{
this->externallyManagedBitrate = true;
this->producerStreamManager->SetExternallyManagedBitrate();
}
virtual uint8_t GetBitratePriority() const = 0;
virtual uint32_t IncreaseLayer(uint32_t bitrate, bool considerLoss) = 0;
virtual void ApplyLayers() = 0;
virtual uint32_t GetDesiredBitrate() const = 0;
virtual void SendRtpPacket(RTC::RTP::Packet* packet, RTC::RTP::SharedPacket& sharedPacket) = 0;
virtual bool GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs) = 0;
virtual const std::vector<RTC::RTP::RtpStreamSend*>& GetRtpStreams() const = 0;
virtual void NeedWorstRemoteFractionLost(uint32_t mappedSsrc, uint8_t& worstRemoteFractionLost) = 0;
virtual void ReceiveNack(RTC::RTCP::FeedbackRtpNackPacket* nackPacket) = 0;
virtual void ReceiveKeyFrameRequest(
RTC::RTCP::FeedbackPs::MessageType messageType, uint32_t ssrc) = 0;
virtual void ReceiveRtcpReceiverReport(RTC::RTCP::ReceiverReport* report) = 0;
virtual void ReceiveRtcpXrReceiverReferenceTime(RTC::RTCP::ReceiverReferenceTime* report) = 0;
virtual uint32_t GetTransmissionRate(uint64_t nowMs) = 0;
virtual float GetRtt() const = 0;
uint8_t GetBitratePriority() const;
uint32_t IncreaseLayer(uint32_t bitrate, bool considerLoss);
void ApplyLayers();
uint32_t GetDesiredBitrate() const;
void SendRtpPacket(RTC::RTP::Packet* packet, RTC::RTP::SharedPacket& sharedPacket);
bool GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs);
void NeedWorstRemoteFractionLost(uint32_t mappedSsrc, uint8_t& worstRemoteFractionLost);
void ReceiveNack(RTC::RTCP::FeedbackRtpNackPacket* nackPacket);
void ReceiveKeyFrameRequest(RTC::RTCP::FeedbackPs::MessageType messageType, uint32_t ssrc);
void ReceiveRtcpReceiverReport(RTC::RTCP::ReceiverReport* report);
void ReceiveRtcpXrReceiverReferenceTime(RTC::RTCP::ReceiverReferenceTime* report);
uint32_t GetTransmissionRate(uint64_t nowMs);
float GetRtt() const;

/* Methods inherited from Channel::ChannelSocket::RequestHandler. */
public:
Expand All @@ -165,10 +166,35 @@ namespace RTC
void EmitTraceEvent(flatbuffers::Offset<FBS::Consumer::TraceNotification>& notification) const;

private:
virtual void UserOnTransportConnected() = 0;
virtual void UserOnTransportDisconnected() = 0;
virtual void UserOnPaused() = 0;
virtual void UserOnResumed() = 0;
void EmitScore() const;
void EmitLayersChange() const;

private:
void UserOnTransportConnected();
void UserOnTransportDisconnected();
void UserOnPaused();
void UserOnResumed();

private:
void CreateRtpStreams();
static void StorePacketInTargetLayerRetransmissionBuffer(
RetransmissionBuffer& targetLayerRetransmissionBuffer,
RTC::RTP::Packet* packet,
RTC::RTP::SharedPacket& sharedPacket);

/* Pure virtual methods inherited from RtpStreamSend::Listener. */
public:
void OnRtpStreamScore(RTC::RTP::RtpStream* rtpStream, uint8_t score, uint8_t previousScore) override;
void OnRtpStreamRetransmitRtpPacket(
RTC::RTP::RtpStreamSend* rtpStream, RTC::RTP::Packet* packet) override;

/* Pure virtual methods inherited from ProducerStreamManager::Listener. */
public:
void OnProducerStreamManagerKeyFrameRequested(uint32_t mappedSsrc) override;
void OnProducerStreamManagerNeedBitrateChange() override;
void OnProducerStreamManagerLayersChanged() override;
void OnProducerStreamManagerClearRetransmissionBuffer() override;
void OnProducerStreamManagerScore() override;

public:
// Passed by argument.
Expand All @@ -186,8 +212,6 @@ namespace RTC
struct RTC::RTP::HeaderExtensionIds rtpHeaderExtensionIds;
const std::vector<uint8_t>* producerRtpStreamScores{ nullptr };
// Others.
// Whether a payload type is supported or not is represented in the
// corresponding position of the bitset.
std::bitset<128u> supportedCodecPayloadTypes;
uint64_t lastRtcpSentTime{ 0u };
uint16_t maxRtcpInterval{ 0u };
Expand All @@ -196,13 +220,24 @@ namespace RTC
struct TraceEventTypes traceEventTypes;

private:
bool pipe{ false };
// Others.
std::vector<RTC::RTP::RtpStreamSend*> rtpStreams;
Comment thread
jmillan marked this conversation as resolved.
absl::flat_hash_map<uint32_t, uint32_t> mapMappedSsrcSsrc;
absl::flat_hash_map<uint32_t, RTC::RTP::RtpStreamSend*> mapSsrcRtpStream;
absl::flat_hash_map<RTC::RTP::RtpStreamSend*, RTC::SeqManager<uint16_t>> mapRtpStreamRtpSeqManager;
// Buffers to store packets that arrive earlier than the first packet of the
// video key frame.
absl::flat_hash_map<RTC::RTP::RtpStreamSend*, RetransmissionBuffer>
mapRtpStreamTargetLayerRetransmissionBuffer;
std::vector<uint32_t> mediaSsrcs;
std::vector<uint32_t> rtxSsrcs;
bool transportConnected{ false };
bool paused{ false };
bool producerPaused{ false };
bool producerClosed{ false };
bool lastSentPacketHasMarker{ false };
std::unique_ptr<RTC::ProducerStreamManager> producerStreamManager;
};
} // namespace RTC

Expand Down
Loading
Loading