[feat] enable congestion controller

This commit is contained in:
dijunkun
2025-02-07 17:42:05 +08:00
parent 316a0220a8
commit 8d7068aa32
32 changed files with 184 additions and 46 deletions

View File

@@ -12,8 +12,10 @@ AudioChannelSend::AudioChannelSend(
: ice_agent_(ice_agent), ice_io_statistics_(ice_io_statistics) {}
void AudioChannelSend::Initialize(rtp::PAYLOAD_TYPE payload_type) {
rtp_packetizer_ = RtpPacketizer::Create(payload_type);
rtp_audio_sender_ = std::make_unique<RtpAudioSender>(ice_io_statistics_);
rtp_packetizer_ =
RtpPacketizer::Create(payload_type, rtp_audio_sender_->GetSsrc());
rtp_audio_sender_->SetSendDataFunc(
[this](const char *data, size_t size) -> int {
if (!ice_agent_) {

View File

@@ -12,8 +12,10 @@ DataChannelSend::DataChannelSend(
: ice_agent_(ice_agent), ice_io_statistics_(ice_io_statistics) {}
void DataChannelSend::Initialize(rtp::PAYLOAD_TYPE payload_type) {
rtp_packetizer_ = RtpPacketizer::Create(payload_type);
rtp_data_sender_ = std::make_unique<RtpDataSender>(ice_io_statistics_);
rtp_packetizer_ =
RtpPacketizer::Create(payload_type, rtp_data_sender_->GetSsrc());
rtp_data_sender_->SetSendDataFunc(
[this](const char *data, size_t size) -> int {
if (!ice_agent_) {

View File

@@ -2,6 +2,7 @@
#include <chrono>
#include "common.h"
#include "log.h"
#define RTCP_SR_INTERVAL 1000
@@ -9,7 +10,7 @@
RtpAudioSender::RtpAudioSender() { SetPeriod(std::chrono::milliseconds(5)); }
RtpAudioSender::RtpAudioSender(std::shared_ptr<IOStatistics> io_statistics)
: io_statistics_(io_statistics) {
: ssrc_(GenerateUniqueSsrc()), io_statistics_(io_statistics) {
SetPeriod(std::chrono::milliseconds(5));
}
@@ -17,6 +18,8 @@ RtpAudioSender::~RtpAudioSender() {
if (rtp_statistics_) {
rtp_statistics_->Stop();
}
SSRCManager::Instance().DeleteSsrc(ssrc_);
}
void RtpAudioSender::Enqueue(std::vector<RtpPacket>& rtp_packets) {

View File

@@ -25,8 +25,8 @@ class RtpAudioSender : public ThreadBase {
public:
void Enqueue(std::vector<RtpPacket> &rtp_packets);
void SetSendDataFunc(std::function<int(const char *, size_t)> data_send_func);
uint32_t GetSsrc() { return ssrc_; }
private:
private:
int SendRtpPacket(RtpPacket &rtp_packet);
int SendRtcpSR(RtcpSenderReport &rtcp_sr);
@@ -39,6 +39,9 @@ class RtpAudioSender : public ThreadBase {
private:
std::function<int(const char *, size_t)> data_send_func_ = nullptr;
RingBuffer<RtpPacket> rtp_packe_queue_;
private:
uint32_t ssrc_ = 0;
std::unique_ptr<RtpStatistics> rtp_statistics_ = nullptr;
std::shared_ptr<IOStatistics> io_statistics_ = nullptr;
uint32_t last_send_bytes_ = 0;

View File

@@ -2,6 +2,7 @@
#include <chrono>
#include "common.h"
#include "log.h"
#define RTCP_SR_INTERVAL 1000
@@ -9,7 +10,7 @@
RtpDataSender::RtpDataSender() {}
RtpDataSender::RtpDataSender(std::shared_ptr<IOStatistics> io_statistics)
: io_statistics_(io_statistics) {
: ssrc_(GenerateUniqueSsrc()), io_statistics_(io_statistics) {
SetPeriod(std::chrono::milliseconds(5));
}
@@ -17,6 +18,8 @@ RtpDataSender::~RtpDataSender() {
if (rtp_statistics_) {
rtp_statistics_->Stop();
}
SSRCManager::Instance().DeleteSsrc(ssrc_);
}
void RtpDataSender::Enqueue(std::vector<RtpPacket>& rtp_packets) {

View File

@@ -25,6 +25,7 @@ class RtpDataSender : public ThreadBase {
public:
void Enqueue(std::vector<RtpPacket> &rtp_packets);
void SetSendDataFunc(std::function<int(const char *, size_t)> data_send_func);
uint32_t GetSsrc() { return ssrc_; }
private:
private:
@@ -41,6 +42,7 @@ class RtpDataSender : public ThreadBase {
RingBuffer<RtpPacket> rtp_packe_queue_;
private:
uint32_t ssrc_ = 0;
std::unique_ptr<RtpStatistics> rtp_statistics_ = nullptr;
std::shared_ptr<IOStatistics> io_statistics_ = nullptr;
uint32_t last_send_bytes_ = 0;

View File

@@ -9,34 +9,35 @@
#define NV12_BUFFER_SIZE (1280 * 720 * 3 / 2)
#define RTCP_RR_INTERVAL 1000
RtpVideoReceiver::RtpVideoReceiver()
RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr<Clock> clock)
: feedback_ssrc_(GenerateUniqueSsrc()),
active_remb_module_(nullptr),
receive_side_congestion_controller_(
clock_,
clock,
[this](std::vector<std::unique_ptr<RtcpPacket>> packets) {
SendCombinedRtcpPacket(std::move(packets));
},
[this](int64_t bitrate_bps, std::vector<uint32_t> ssrcs) {
SendRemb(bitrate_bps, ssrcs);
}),
clock_(Clock::GetRealTimeClockShared()) {
clock_(clock) {
SetPeriod(std::chrono::milliseconds(5));
// rtcp_thread_ = std::thread(&RtpVideoReceiver::RtcpThread, this);
}
RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr<IOStatistics> io_statistics)
RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr<Clock> clock,
std::shared_ptr<IOStatistics> io_statistics)
: io_statistics_(io_statistics),
feedback_ssrc_(GenerateUniqueSsrc()),
receive_side_congestion_controller_(
clock_,
clock,
[this](std::vector<std::unique_ptr<RtcpPacket>> packets) {
SendCombinedRtcpPacket(std::move(packets));
},
[this](int64_t bitrate_bps, std::vector<uint32_t> ssrcs) {
SendRemb(bitrate_bps, ssrcs);
}),
clock_(Clock::GetRealTimeClockShared()) {
clock_(clock) {
SetPeriod(std::chrono::milliseconds(5));
// rtcp_thread_ = std::thread(&RtpVideoReceiver::RtcpThread, this);
@@ -404,6 +405,21 @@ int RtpVideoReceiver::SendRtcpRR(RtcpReceiverReport& rtcp_rr) {
return 0;
}
TimeDelta AtoToTimeDelta(uint16_t receive_info) {
// receive_info
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// |R|ECN| Arrival time offset |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
const uint16_t ato = receive_info & 0x1FFF;
if (ato == 0x1FFE) {
return TimeDelta::PlusInfinity();
}
if (ato == 0x1FFF) {
return TimeDelta::MinusInfinity();
}
return TimeDelta::Seconds(ato) / 1024;
}
void RtpVideoReceiver::SendCombinedRtcpPacket(
std::vector<std::unique_ptr<RtcpPacket>> rtcp_packets) {
if (!data_send_func_) {
@@ -414,9 +430,18 @@ void RtpVideoReceiver::SendCombinedRtcpPacket(
RTCPSender rtcp_sender(
[this](const uint8_t* buffer, size_t size) -> int {
webrtc::rtcp::CommonHeader rtcp_block;
// bool valid = true;
// if (!rtcp_block.Parse(buffer, size)) {
// valid = false;
// }
webrtc::rtcp::CongestionControlFeedback feedback;
feedback.Parse(rtcp_block);
return data_send_func_((const char*)buffer, size);
},
IP_PACKET_SIZE);
1200);
for (auto& rtcp_packet : rtcp_packets) {
rtcp_packet->SetSenderSsrc(feedback_ssrc_);

View File

@@ -23,8 +23,9 @@ using namespace webrtc;
class RtpVideoReceiver : public ThreadBase {
public:
RtpVideoReceiver();
RtpVideoReceiver(std::shared_ptr<IOStatistics> io_statistics);
RtpVideoReceiver(std::shared_ptr<Clock> clock);
RtpVideoReceiver(std::shared_ptr<Clock> clock,
std::shared_ptr<IOStatistics> io_statistics);
virtual ~RtpVideoReceiver();
public:

View File

@@ -2,6 +2,7 @@
#include <chrono>
#include "common.h"
#include "log.h"
// #define SAVE_RTP_SENT_STREAM
@@ -11,7 +12,7 @@
RtpVideoSender::RtpVideoSender() {}
RtpVideoSender::RtpVideoSender(std::shared_ptr<IOStatistics> io_statistics)
: io_statistics_(io_statistics) {
: ssrc_(GenerateUniqueSsrc()), io_statistics_(io_statistics) {
SetPeriod(std::chrono::milliseconds(5));
#ifdef SAVE_RTP_SENT_STREAM
file_rtp_sent_ = fopen("rtp_sent_stream.h264", "w+b");
@@ -26,6 +27,8 @@ RtpVideoSender::~RtpVideoSender() {
rtp_statistics_->Stop();
}
SSRCManager::Instance().DeleteSsrc(ssrc_);
#ifdef SAVE_RTP_SENT_STREAM
if (file_rtp_sent_) {
fflush(file_rtp_sent_);
@@ -51,12 +54,26 @@ void RtpVideoSender::SetSendDataFunc(
data_send_func_ = data_send_func;
}
void RtpVideoSender::SetOnSentPacketFunc(
std::function<void(const webrtc::RtpPacketToSend&)> on_sent_packet_func) {
on_sent_packet_func_ = on_sent_packet_func;
}
int RtpVideoSender::SendRtpPacket(RtpPacket& rtp_packet) {
if (!data_send_func_) {
LOG_ERROR("data_send_func_ is nullptr");
return -1;
}
if (on_sent_packet_func_) {
webrtc::RtpPacketToSend rtp_packet_to_send;
rtp_packet_to_send.SetSequenceNumber(rtp_packet.SequenceNumber());
rtp_packet_to_send.SetSsrc(rtp_packet.Ssrc());
rtp_packet_to_send.set_transport_sequence_number(transport_seq_++);
rtp_packet_to_send.set_packet_type(webrtc::RtpPacketMediaType::kVideo);
on_sent_packet_func_(rtp_packet_to_send);
}
if (0 != data_send_func_((const char*)rtp_packet.Buffer().data(),
rtp_packet.Size())) {
// LOG_ERROR("Send rtp packet failed");

View File

@@ -7,6 +7,7 @@
#include "ringbuffer.h"
#include "rtcp_sender_report.h"
#include "rtp_packet.h"
#include "rtp_packet_to_send.h"
#include "rtp_statistics.h"
#include "thread_base.h"
@@ -19,6 +20,9 @@ class RtpVideoSender : public ThreadBase {
public:
void Enqueue(std::vector<RtpPacket> &rtp_packets);
void SetSendDataFunc(std::function<int(const char *, size_t)> data_send_func);
void SetOnSentPacketFunc(
std::function<void(const webrtc::RtpPacketToSend &)> on_sent_packet_func);
uint32_t GetSsrc() { return ssrc_; }
private:
int SendRtpPacket(RtpPacket &rtp_packet);
@@ -31,9 +35,12 @@ class RtpVideoSender : public ThreadBase {
private:
std::function<int(const char *, size_t)> data_send_func_ = nullptr;
std::function<void(const webrtc::RtpPacketToSend &)> on_sent_packet_func_ =
nullptr;
RingBuffer<RtpPacket> rtp_packe_queue_;
private:
uint32_t ssrc_ = 0;
std::unique_ptr<RtpStatistics> rtp_statistics_ = nullptr;
std::shared_ptr<IOStatistics> io_statistics_ = nullptr;
uint32_t last_send_bytes_ = 0;
@@ -41,6 +48,9 @@ class RtpVideoSender : public ThreadBase {
uint32_t total_rtp_payload_sent_ = 0;
uint32_t total_rtp_packets_sent_ = 0;
private:
int64_t transport_seq_ = 0;
private:
FILE *file_rtp_sent_ = nullptr;
};

View File

@@ -5,17 +5,19 @@
VideoChannelReceive::VideoChannelReceive() {}
VideoChannelReceive::VideoChannelReceive(
std::shared_ptr<IceAgent> ice_agent,
std::shared_ptr<webrtc::Clock> clock, std::shared_ptr<IceAgent> ice_agent,
std::shared_ptr<IOStatistics> ice_io_statistics,
std::function<void(VideoFrame &)> on_receive_complete_frame)
: ice_agent_(ice_agent),
ice_io_statistics_(ice_io_statistics),
on_receive_complete_frame_(on_receive_complete_frame) {}
on_receive_complete_frame_(on_receive_complete_frame),
clock_(clock) {}
VideoChannelReceive::~VideoChannelReceive() {}
void VideoChannelReceive::Initialize(rtp::PAYLOAD_TYPE payload_type) {
rtp_video_receiver_ = std::make_unique<RtpVideoReceiver>(ice_io_statistics_);
rtp_video_receiver_ =
std::make_unique<RtpVideoReceiver>(clock_, ice_io_statistics_);
rtp_video_receiver_->SetOnReceiveCompleteFrame(
[this](VideoFrame &video_frame) -> void {
on_receive_complete_frame_(video_frame);

View File

@@ -7,6 +7,7 @@
#ifndef _VIDEO_CHANNEL_RECEIVE_H_
#define _VIDEO_CHANNEL_RECEIVE_H_
#include "clock.h"
#include "ice_agent.h"
#include "rtp_video_receiver.h"
@@ -14,7 +15,7 @@ class VideoChannelReceive {
public:
VideoChannelReceive();
VideoChannelReceive(
std::shared_ptr<IceAgent> ice_agent,
std::shared_ptr<webrtc::Clock> clock, std::shared_ptr<IceAgent> ice_agent,
std::shared_ptr<IOStatistics> ice_io_statistics,
std::function<void(VideoFrame &)> on_receive_complete_frame);
@@ -31,6 +32,9 @@ class VideoChannelReceive {
std::shared_ptr<IOStatistics> ice_io_statistics_ = nullptr;
std::unique_ptr<RtpVideoReceiver> rtp_video_receiver_ = nullptr;
std::function<void(VideoFrame &)> on_receive_complete_frame_ = nullptr;
private:
std::shared_ptr<Clock> clock_;
};
#endif

View File

@@ -1,21 +1,25 @@
#include "video_channel_send.h"
#include "log.h"
#include "rtc_base/network/sent_packet.h"
VideoChannelSend::VideoChannelSend() {}
VideoChannelSend::~VideoChannelSend() {}
VideoChannelSend::VideoChannelSend(
std::shared_ptr<IceAgent> ice_agent,
std::shared_ptr<webrtc::Clock> clock, std::shared_ptr<IceAgent> ice_agent,
std::shared_ptr<IOStatistics> ice_io_statistics)
: ice_agent_(ice_agent), ice_io_statistics_(ice_io_statistics){};
: ice_agent_(ice_agent),
ice_io_statistics_(ice_io_statistics),
clock_(clock){};
void VideoChannelSend::Initialize(rtp::PAYLOAD_TYPE payload_type) {
controller_ = std::make_unique<CongestionControl>();
rtp_packetizer_ = RtpPacketizer::Create(payload_type);
rtp_video_sender_ = std::make_unique<RtpVideoSender>(ice_io_statistics_);
rtp_packetizer_ =
RtpPacketizer::Create(payload_type, rtp_video_sender_->GetSsrc());
rtp_video_sender_->SetSendDataFunc(
[this](const char* data, size_t size) -> int {
if (!ice_agent_) {
@@ -33,9 +37,31 @@ void VideoChannelSend::Initialize(rtp::PAYLOAD_TYPE payload_type) {
}
ice_io_statistics_->UpdateVideoOutboundBytes((uint32_t)size);
return ice_agent_->Send(data, size);
});
rtp_video_sender_->SetOnSentPacketFunc(
[this](const webrtc::RtpPacketToSend& packet) -> void {
webrtc::PacedPacketInfo pacing_info;
size_t transport_overhead_bytes_per_packet_ = 0;
webrtc::Timestamp creation_time =
webrtc::Timestamp::Millis(clock_->TimeInMilliseconds());
transport_feedback_adapter_.AddPacket(
packet, pacing_info, transport_overhead_bytes_per_packet_,
creation_time);
rtc::SentPacket sent_packet;
sent_packet.packet_id = packet.transport_sequence_number().value();
sent_packet.send_time_ms = clock_->TimeInMilliseconds();
sent_packet.info.included_in_feedback = true;
sent_packet.info.included_in_allocation = true;
sent_packet.info.packet_size_bytes = packet.size();
sent_packet.info.packet_type = rtc::PacketType::kData;
transport_feedback_adapter_.ProcessSentPacket(sent_packet);
});
rtp_video_sender_->Start();
}
@@ -71,7 +97,8 @@ void VideoChannelSend::HandleTransportPacketsFeedback(
// if (transport_is_ecn_capable_) {
// // If transport does not support ECN, packets should not be sent as
// ECT(1).
// // TODO: bugs.webrtc.org/42225697 - adapt to ECN feedback and continue to
// // TODO: bugs.webrtc.org/42225697 - adapt to ECN feedback and
// continue to
// // send packets as ECT(1) if transport is ECN capable.
// transport_is_ecn_capable_ = false;
// LOG_INFO("Transport is {} ECN capable. Stop sending ECT(1)",
@@ -103,8 +130,8 @@ void VideoChannelSend::PostUpdates(webrtc::NetworkControlUpdate update) {
}
void VideoChannelSend::UpdateControlState() {
// std::optional<TargetTransferRate> update = control_handler_->GetUpdate();
// if (!update) return;
// std::optional<TargetTransferRate> update =
// control_handler_->GetUpdate(); if (!update) return;
// retransmission_rate_limiter_.SetMaxRate(update->target_rate.bps());
// observer_->OnTargetTransferRate(*update);
}

View File

@@ -8,6 +8,7 @@
#define _VIDEO_CHANNEL_SEND_H_
#include "api/transport/network_types.h"
#include "clock.h"
#include "congestion_control.h"
#include "congestion_control_feedback.h"
#include "ice_agent.h"
@@ -19,7 +20,8 @@
class VideoChannelSend {
public:
VideoChannelSend();
VideoChannelSend(std::shared_ptr<IceAgent> ice_agent,
VideoChannelSend(std::shared_ptr<webrtc::Clock> clock,
std::shared_ptr<IceAgent> ice_agent,
std::shared_ptr<IOStatistics> ice_io_statistics);
~VideoChannelSend();
@@ -47,6 +49,7 @@ class VideoChannelSend {
std::unique_ptr<RtpVideoSender> rtp_video_sender_ = nullptr;
private:
std::shared_ptr<Clock> clock_;
int64_t current_offset_ = std::numeric_limits<int64_t>::min();
// Used by RFC 8888 congestion control feedback to track base time.
std::optional<uint32_t> last_feedback_compact_ntp_time_;