[feat] update congestion control feedback

This commit is contained in:
dijunkun
2024-12-18 17:27:42 +08:00
parent c6d4b172fc
commit 2512e1eb15
18 changed files with 1417 additions and 2 deletions

View File

@@ -0,0 +1,302 @@
#include "congestion_control_feedback.h"
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <utility>
#include <vector>
#include "array_view.h"
#include "byte_io.h"
#include "log.h"
/*
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|V=2|P| FMT=11 | PT = 205 | length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| SSRC of RTCP packet sender |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| SSRC of 1st RTP Stream |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| begin_seq | num_reports |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|R|ECN| Arrival time offset | ... .
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
. .
. .
. .
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| SSRC of nth RTP Stream |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| begin_seq | num_reports |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|R|ECN| Arrival time offset | ... |
. .
. .
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Report Timestamp (32 bits) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
namespace {
constexpr size_t kSenderSsrcLength = 4;
constexpr size_t kHeaderPerMediaSssrcLength = 8;
constexpr size_t kTimestampLength = 4;
// RFC-3168, Section 5
constexpr uint16_t kEcnEct1 = 0x01;
constexpr uint16_t kEcnEct0 = 0x02;
constexpr uint16_t kEcnCe = 0x03;
// Arrival time offset (ATO, 13 bits):
// The arrival time of the RTP packet at the receiver, as an offset before the
// time represented by the Report Timestamp (RTS) field of this RTCP congestion
// control feedback report. The ATO field is in units of 1/1024 seconds (this
// unit is chosen to give exact offsets from the RTS field) so, for example, an
// ATO value of 512 indicates that the corresponding RTP packet arrived exactly
// half a second before the time instant represented by the RTS field. If the
// measured value is greater than 8189/1024 seconds (the value that would be
// coded as 0x1FFD), the value 0x1FFE MUST be reported to indicate an over-range
// measurement. If the measurement is unavailable or if the arrival time of the
// RTP packet is after the time represented by the RTS field, then an ATO value
// of 0x1FFF MUST be reported for the packet.
uint16_t To13bitAto(int64_t arrival_time_offset) {
if (arrival_time_offset < 0) {
return 0x1FFF;
}
return std::min(static_cast<int64_t>(1024 * (arrival_time_offset / 1000)),
int64_t{0x1FFE});
}
int64_t AtoToTimeDelta(uint16_t receive_info) {
// receive_info
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// |R|ECN| Arrival time offset |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// ato -> second
const uint16_t ato = receive_info & 0x1FFF;
if (ato == 0x1FFE) {
return std::numeric_limits<int64_t>::max();
}
if (ato == 0x1FFF) {
return std::numeric_limits<int64_t>::min();
}
return ato / 1024;
}
uint16_t To2BitEcn(EcnMarking ecn_marking) {
switch (ecn_marking) {
case EcnMarking::kNotEct:
return 0;
case EcnMarking::kEct1:
return kEcnEct1 << 13;
case EcnMarking::kEct0:
return kEcnEct0 << 13;
case EcnMarking::kCe:
return kEcnCe << 13;
}
}
EcnMarking ToEcnMarking(uint16_t receive_info) {
const uint16_t ecn = (receive_info >> 13) & 0b11;
if (ecn == kEcnEct1) {
return EcnMarking::kEct1;
}
if (ecn == kEcnEct0) {
return EcnMarking::kEct0;
}
if (ecn == kEcnCe) {
return EcnMarking::kCe;
}
return EcnMarking::kNotEct;
}
} // namespace
CongestionControlFeedback ::CongestionControlFeedback(
std::vector<PacketInfo> packets, uint32_t compact_ntp_timestamp)
: packets_(std::move(packets)),
report_timestamp_compact_ntp_(compact_ntp_timestamp) {}
bool CongestionControlFeedback::Create(uint8_t* buffer, size_t* position,
size_t max_length,
PacketReadyCallback callback) const {
// Ensure there is enough room for this packet.
while (*position + BlockLength() > max_length) {
if (!OnBufferFull(buffer, position, callback)) return false;
}
const size_t position_end = *position + BlockLength();
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// |V=2|P| FMT=11 | PT = 205 | length |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | SSRC of RTCP packet sender |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
CreateHeader(kFeedbackMessageType, kPacketType, HeaderLength(), buffer,
position);
ByteWriter<uint32_t>::WriteBigEndian(&buffer[*position], sender_ssrc());
*position += 4;
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | SSRC of nth RTP Stream |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | begin_seq | num_reports |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// |R|ECN| Arrival time offset | ... .
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// . .
auto write_report_for_ssrc = [&](ArrayView<const PacketInfo> packets) {
// SSRC of nth RTP stream.
ByteWriter<uint32_t>::WriteBigEndian(&buffer[*position], packets[0].ssrc);
*position += 4;
// begin_seq
ByteWriter<uint16_t>::WriteBigEndian(&buffer[*position],
packets[0].sequence_number);
*position += 2;
// num_reports
uint16_t num_reports = packets.size();
RTC_DCHECK_EQ(static_cast<uint16_t>(
packets[packets.size() - 1].sequence_number -
packets[0].sequence_number + 1),
packets.size())
<< "Expected continous rtp sequence numbers.";
// Each report block MUST NOT include more than 16384 packet metric
// blocks (i.e., it MUST NOT report on more than one quarter of the
// sequence number space in a single report).
if (num_reports > 16384) {
LOG_FATAL("Unexpected number of reports: {}", num_reports);
return;
}
ByteWriter<uint16_t>::WriteBigEndian(&buffer[*position], num_reports);
*position += 2;
for (const PacketInfo& packet : packets) {
bool received = packet.arrival_time_offset.IsFinite();
uint16_t packet_info = 0;
if (received) {
packet_info = 0x8000 | To2BitEcn(packet.ecn) |
To13bitAto(packet.arrival_time_offset);
}
ByteWriter<uint16_t>::WriteBigEndian(&buffer[*position], packet_info);
*position += 2;
}
// 32bit align per SSRC block.
if (num_reports % 2 != 0) {
ByteWriter<uint16_t>::WriteBigEndian(&buffer[*position], 0);
*position += 2;
}
};
ArrayView<const PacketInfo> remaining(packets_);
while (!remaining.empty()) {
int number_of_packets_for_ssrc = 0;
uint32_t ssrc = remaining[0].ssrc;
for (const PacketInfo& packet_info : remaining) {
if (packet_info.ssrc != ssrc) {
break;
}
++number_of_packets_for_ssrc;
}
write_report_for_ssrc(remaining.subview(0, number_of_packets_for_ssrc));
remaining = remaining.subview(number_of_packets_for_ssrc);
}
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | Report Timestamp (32 bits) |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
ByteWriter<uint32_t>::WriteBigEndian(&buffer[*position],
report_timestamp_compact_ntp_);
*position += 4;
RTC_DCHECK_EQ(*position, position_end);
return true;
}
size_t CongestionControlFeedback::BlockLength() const {
// Total size of this packet
size_t total_size = kSenderSsrcLength + kHeaderLength + kTimestampLength;
if (packets_.empty()) {
return total_size;
}
auto increase_size_per_ssrc = [](int number_of_packets_for_ssrc) {
// Each packet report needs two bytes.
size_t packet_block_size = number_of_packets_for_ssrc * 2;
// 32 bit aligned.
return kHeaderPerMediaSssrcLength + packet_block_size +
((number_of_packets_for_ssrc % 2) != 0 ? 2 : 0);
};
uint32_t ssrc = packets_.front().ssrc;
uint16_t first_sequence_number = packets_.front().sequence_number;
for (size_t i = 0; i < packets_.size(); ++i) {
if (packets_[i].ssrc != ssrc) {
uint16_t number_of_packets =
packets_[i - 1].sequence_number - first_sequence_number + 1;
total_size += increase_size_per_ssrc(number_of_packets);
ssrc = packets_[i].ssrc;
first_sequence_number = packets_[i].sequence_number;
}
}
uint16_t number_of_packets =
packets_.back().sequence_number - first_sequence_number + 1;
total_size += increase_size_per_ssrc(number_of_packets);
return total_size;
}
bool CongestionControlFeedback::Parse(const rtcp::CommonHeader& packet) {
const uint8_t* payload = packet.payload();
const uint8_t* payload_end = packet.payload() + packet.payload_size_bytes();
if (packet.payload_size_bytes() % 4 != 0 ||
packet.payload_size_bytes() < kSenderSsrcLength + kTimestampLength) {
return false;
}
SetSenderSsrc(ByteReader<uint32_t>::ReadBigEndian(payload));
payload += 4;
report_timestamp_compact_ntp_ =
ByteReader<uint32_t>::ReadBigEndian(payload_end - 4);
payload_end -= 4;
while (payload + kHeaderPerMediaSssrcLength < payload_end) {
uint32_t ssrc = ByteReader<uint32_t>::ReadBigEndian(payload);
payload += 4;
uint16_t base_seqno = ByteReader<uint16_t>::ReadBigEndian(payload);
payload += 2;
uint16_t num_reports = ByteReader<uint16_t>::ReadBigEndian(payload);
payload += 2;
constexpr size_t kPerPacketLength = 2;
if (payload + kPerPacketLength * num_reports > payload_end) {
return false;
}
for (int i = 0; i < num_reports; ++i) {
uint16_t packet_info = ByteReader<uint16_t>::ReadBigEndian(payload);
payload += 2;
uint16_t seq_no = base_seqno + i;
bool received = (packet_info & 0x8000);
packets_.push_back(
{.ssrc = ssrc,
.sequence_number = seq_no,
.arrival_time_offset = received ? AtoToTimeDelta(packet_info)
: TimeDelta::MinusInfinity(),
.ecn = ToEcnMarking(packet_info)});
}
if (num_reports % 2) {
// 2 bytes padding
payload += 2;
}
}
return payload == payload_end;
}

View File

@@ -0,0 +1,78 @@
/*
* @Author: DI JUNKUN
* @Date: 2024-12-18
* Copyright (c) 2024 by DI JUNKUN, All Rights Reserved.
*/
#ifndef _CONGESTION_CONTROL_FEEDBACK_H_
#define _CONGESTION_CONTROL_FEEDBACK_H_
#include <cstddef>
#include <cstdint>
#include <limits>
#include <vector>
#include "rtp_feedback.h"
// L4S Explicit Congestion Notification (ECN) .
// https://www.rfc-editor.org/rfc/rfc9331.html ECT stands for ECN-Capable
// Transport and CE stands for Congestion Experienced.
// RFC-3168, Section 5
// +-----+-----+
// | ECN FIELD |
// +-----+-----+
// ECT CE [Obsolete] RFC 2481 names for the ECN bits.
// 0 0 Not-ECT
// 0 1 ECT(1)
// 1 0 ECT(0)
// 1 1 CE
enum class EcnMarking {
kNotEct = 0, // Not ECN-Capable Transport
kEct1 = 1, // ECN-Capable Transport
kEct0 = 2, // Not used by L4s (or webrtc.)
kCe = 3, // Congestion experienced
};
// Congestion control feedback message as specified in
// https://www.rfc-editor.org/rfc/rfc8888.html
class CongestionControlFeedback : public RtpFeedback {
public:
struct PacketInfo {
uint32_t ssrc = 0;
uint16_t sequence_number = 0;
// Time offset from report timestamp. Minus infinity if the packet has not
// been received.
int64_t arrival_time_offset = std::numeric_limits<int64_t>::min();
rtc::EcnMarking ecn = rtc::EcnMarking::kNotEct;
};
static constexpr uint8_t kFeedbackMessageType = 11;
// `Packets` MUST be sorted in sequence_number order per SSRC. There MUST not
// be missing sequence numbers between `Packets`. `Packets` MUST not include
// duplicate sequence numbers.
CongestionControlFeedback(std::vector<PacketInfo> packets,
uint32_t report_timestamp_compact_ntp);
CongestionControlFeedback() = default;
bool Parse(const CommonHeader& packet);
rtc::ArrayView<const PacketInfo> packets() const { return packets_; }
uint32_t report_timestamp_compact_ntp() const {
return report_timestamp_compact_ntp_;
}
// Serialize the packet.
bool Create(uint8_t* packet, size_t* position, size_t max_length,
PacketReadyCallback callback) const override;
size_t BlockLength() const override;
private:
std::vector<PacketInfo> packets_;
uint32_t report_timestamp_compact_ntp_ = 0;
};
#endif

View File

@@ -0,0 +1,108 @@
#include "congestion_control_feedback_generator.h"
#include <algorithm>
#include <cstdint>
#include <memory>
#include <optional>
#include <utility>
#include <vector>
uint32_t ConvertToCompactNtp(int64_t now_ms) {
int64_t seconds = now_ms / 1000;
int64_t milliseconds = now_ms % 1000;
uint16_t ntp_seconds = static_cast<uint16_t>(seconds & 0xFFFF);
uint16_t ntp_fraction = static_cast<uint16_t>((milliseconds * 65536) / 1000);
uint32_t compact_ntp = (ntp_seconds << 16) | ntp_fraction;
return compact_ntp;
}
CongestionControlFeedbackGenerator::CongestionControlFeedbackGenerator(
RtcpSender rtcp_sender)
: rtcp_sender_(std::move(rtcp_sender)) {}
void CongestionControlFeedbackGenerator::OnReceivedPacket(
const RtpPacketReceived& packet) {
marker_bit_seen_ |= packet.Marker();
if (!first_arrival_time_since_feedback_) {
first_arrival_time_since_feedback_ = packet.arrival_time();
}
feedback_trackers_[packet.Ssrc()].ReceivedPacket(packet);
if (NextFeedbackTime() < packet.arrival_time()) {
auto now = std::chrono::system_clock::now();
auto now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch())
.count();
SendFeedback(now_ms);
}
}
int64_t CongestionControlFeedbackGenerator::NextFeedbackTime() const {
auto now = std::chrono::system_clock::now();
auto now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
now.time_since_epoch())
.count();
if (!first_arrival_time_since_feedback_) {
return std::max(now_ms + min_time_between_feedback_,
next_possible_feedback_send_time_);
}
if (!marker_bit_seen_) {
return std::max(next_possible_feedback_send_time_,
*first_arrival_time_since_feedback_ +
max_time_to_wait_for_packet_with_marker_);
}
return next_possible_feedback_send_time_;
}
int64_t CongestionControlFeedbackGenerator::Process(int64_t now_ms) {
if (NextFeedbackTime() <= now_ms) {
SendFeedback(now_ms);
}
return NextFeedbackTime() - now_ms;
}
void CongestionControlFeedbackGenerator::OnSendBandwidthEstimateChanged(
DataRate estimate) {
// Feedback reports should max occupy 5% of total bandwidth.
max_feedback_rate_ = estimate * 0.05;
}
void CongestionControlFeedbackGenerator::SetTransportOverhead(
DataSize overhead_per_packet) {
packet_overhead_ = overhead_per_packet;
}
void CongestionControlFeedbackGenerator::SendFeedback(int64_t now_ms) {
uint32_t compact_ntp = ConvertToCompactNtp(now_ms);
std::vector<rtcp::CongestionControlFeedback::PacketInfo> rtcp_packet_info;
for (auto& [unused, tracker] : feedback_trackers_) {
tracker.AddPacketsToFeedback(now_ms, rtcp_packet_info);
}
marker_bit_seen_ = false;
first_arrival_time_since_feedback_ = std::nullopt;
auto feedback = std::make_unique<rtcp::CongestionControlFeedback>(
std::move(rtcp_packet_info), compact_ntp);
CalculateNextPossibleSendTime(feedback->BlockLength(), now_ms);
std::vector<std::unique_ptr<rtcp::RtcpPacket>> rtcp_packets;
rtcp_packets.push_back(std::move(feedback));
rtcp_sender_(std::move(rtcp_packets));
}
void CongestionControlFeedbackGenerator::CalculateNextPossibleSendTime(
int64_t feedback_size, int64_t now_ms) {
int64_t time_since_last_sent = now - last_feedback_sent_time_;
size_t debt_payed = time_since_last_sent * max_feedback_rate_;
send_rate_debt_ =
debt_payed > send_rate_debt_ ? 0 : send_rate_debt_ - debt_payed;
send_rate_debt_ += feedback_size + packet_overhead_;
last_feedback_sent_time_ = now_ms;
next_possible_feedback_send_time_ =
now_ms +
std::clamp(max_feedback_rate_ == 0 ? std::numeric_limits<int64_t>::max()
: send_rate_debt_ / max_feedback_rate_,
min_time_between_feedback_, max_time_between_feedback_);
}

View File

@@ -0,0 +1,59 @@
/*
* @Author: DI JUNKUN
* @Date: 2024-12-18
* Copyright (c) 2024 by DI JUNKUN, All Rights Reserved.
*/
#ifndef _CONGESTION_CONTROL_FEEDBACK_GENERATOR_H_
#define _CONGESTION_CONTROL_FEEDBACK_GENERATOR_H_
#include "rtp_packet_received.h"
class CongestionControlFeedbackGenerator
: public RtpTransportFeedbackGenerator {
public:
CongestionControlFeedbackGenerator(
RtpTransportFeedbackGenerator::RtcpSender feedback_sender);
~CongestionControlFeedbackGenerator() = default;
void OnReceivedPacket(const RtpPacketReceived& packet) override;
void OnSendBandwidthEstimateChanged(DataRate estimate) override;
int64_t Process(int64_t now_ms) override;
void SetTransportOverhead(DataSize overhead_per_packet) override;
private:
int64_t NextFeedbackTime() const;
void SendFeedback(int64_t now_ms);
void CalculateNextPossibleSendTime(int64_t feedback_size, int64_t now_ms);
private:
// Feedback should not use more than 5% of the configured send bandwidth
// estimate. Min and max duration between feedback is configurable using field
// trials, but per default, min is 25ms and max is 250ms.
// If possible, given the other constraints, feedback will be sent when a
// packet with marker bit is received in order to provide feedback as soon as
// possible after receiving a complete video frame. If no packet with marker
// bit is received, feedback can be delayed up to 25ms after the first packet
// since the last sent feedback. On good networks, this means that a sender
// may receive feedback for every sent frame.
int64_t min_time_between_feedback_ = 25;
int64_t max_time_between_feedback_ = 250;
int64_t max_time_to_wait_for_packet_with_marker_ = 25;
int64_t max_feedback_rate_ = 1000; // kbps
int64_t packet_overhead_ = 0;
int64_t send_rate_debt_ = 0;
std::optional<int64_t> first_arrival_time_since_feedback_;
int64_t next_possible_feedback_send_time_ = 0;
int64_t last_feedback_sent_time_ = 0;
bool marker_bit_seen_ = false;
};
#endif

View File

@@ -0,0 +1,87 @@
#include "congestion_control_feedback_tracker.h"
#include <cstdint>
#include <tuple>
#include <vector>
void CongestionControlFeedbackTracker::ReceivedPacket(
const RtpPacketReceived& packet) {
int64_t unwrapped_sequence_number =
unwrapper_.Unwrap(packet.SequenceNumber());
if (last_sequence_number_in_feedback_ &&
unwrapped_sequence_number < *last_sequence_number_in_feedback_ + 1) {
RTC_LOG(LS_WARNING)
<< "Received packet unorderered between feeedback. SSRC: "
<< packet.Ssrc() << " Seq: " << packet.SequenceNumber()
<< " last feedback: "
<< static_cast<uint16_t>(*last_sequence_number_in_feedback_);
// TODO: bugs.webrtc.org/374550342 - According to spec, the old packets
// should be reported again. But at the moment, we dont store history of
// packet we already reported and thus, they will be reported as lost. Note
// that this is likely not a problem in webrtc since the packets will also
// be removed from the send history when they are first reported as
// received.
last_sequence_number_in_feedback_ = unwrapped_sequence_number - 1;
}
packets_.push_back({.ssrc = packet.Ssrc(),
.unwrapped_sequence_number = unwrapped_sequence_number,
.arrival_time = packet.arrival_time(),
.ecn = packet.ecn()});
}
void CongestionControlFeedbackTracker::AddPacketsToFeedback(
int64_t feedback_time,
std::vector<CongestionControlFeedback::PacketInfo>& packet_feedback) {
if (packets_.empty()) {
return;
}
absl::c_sort(packets_, [](const PacketInfo& a, const PacketInfo& b) {
return std::tie(a.unwrapped_sequence_number, a.arrival_time) <
std::tie(b.unwrapped_sequence_number, b.arrival_time);
});
if (!last_sequence_number_in_feedback_) {
last_sequence_number_in_feedback_ =
packets_.front().unwrapped_sequence_number - 1;
}
auto packet_it = packets_.begin();
uint32_t ssrc = packet_it->ssrc;
for (int64_t sequence_number = *last_sequence_number_in_feedback_ + 1;
sequence_number <= packets_.back().unwrapped_sequence_number;
++sequence_number) {
RTC_DCHECK(packet_it != packets_.end());
RTC_DCHECK_EQ(ssrc, packet_it->ssrc);
rtc::EcnMarking ecn = rtc::EcnMarking::kNotEct;
TimeDelta arrival_time_offset = TimeDelta::MinusInfinity();
if (sequence_number == packet_it->unwrapped_sequence_number) {
arrival_time_offset = feedback_time - packet_it->arrival_time;
ecn = packet_it->ecn;
++packet_it;
while (packet_it != packets_.end() &&
packet_it->unwrapped_sequence_number == sequence_number) {
// According to RFC 8888:
// If duplicate copies of a particular RTP packet are received, then the
// arrival time of the first copy to arrive MUST be reported. If any of
// the copies of the duplicated packet are ECN-CE marked, then an ECN-CE
// mark MUST be reported for that packet; otherwise, the ECN mark of the
// first copy to arrive is reported.
if (packet_it->ecn == rtc::EcnMarking::kCe) {
ecn = rtc::EcnMarking::kCe;
}
RTC_LOG(LS_WARNING) << "Received duplicate packet ssrc:" << ssrc
<< " seq:" << static_cast<uint16_t>(sequence_number)
<< " ecn: " << static_cast<int>(ecn);
++packet_it;
}
} // else - the packet has not been received yet.
packet_feedback.push_back(
{.ssrc = ssrc,
.sequence_number = static_cast<uint16_t>(sequence_number),
.arrival_time_offset = arrival_time_offset,
.ecn = ecn});
}
last_sequence_number_in_feedback_ = packets_.back().unwrapped_sequence_number;
packets_.clear();
}

View File

@@ -0,0 +1,43 @@
/*
* @Author: DI JUNKUN
* @Date: 2024-12-18
* Copyright (c) 2024 by DI JUNKUN, All Rights Reserved.
*/
#ifndef _CONGESTION_CONTROL_FEEDBACK_TRACKER_H_
#define _CONGESTION_CONTROL_FEEDBACK_TRACKER_H_
#include <optional>
#include <vector>
#include "rtp_packet_received.h"
class CongestionControlFeedbackTracker {
public:
CongestionControlFeedbackTracker() = default;
void ReceivedPacket(const RtpPacketReceived& packet);
// Adds received packets to `packet_feedback`
// RTP sequence numbers are continous from the last created feedback unless
// reordering has occured between feedback packets. If so, the sequence
// number range may overlap with previousely sent feedback.
void AddPacketsToFeedback(
int64_t feedback_time,
std::vector<CongestionControlFeedback::PacketInfo>& packet_feedback);
private:
struct PacketInfo {
uint32_t ssrc;
int64_t unwrapped_sequence_number = 0;
int64_t arrival_time;
rtc::EcnMarking ecn = rtc::EcnMarking::kNotEct;
};
std::optional<int64_t> last_sequence_number_in_feedback_;
SeqNumUnwrapper<uint16_t> unwrapper_;
std::vector<PacketInfo> packets_;
};
#endif

View File

@@ -0,0 +1,156 @@
/*
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "receive_side_congestion_controller.h"
#include <algorithm>
#include <memory>
#include <utility>
namespace {
static const uint32_t kTimeOffsetSwitchThreshold = 30;
} // namespace
void ReceiveSideCongestionController::OnRttUpdate(int64_t avg_rtt_ms,
int64_t max_rtt_ms) {
MutexLock lock(&mutex_);
rbe_->OnRttUpdate(avg_rtt_ms, max_rtt_ms);
}
void ReceiveSideCongestionController::RemoveStream(uint32_t ssrc) {
MutexLock lock(&mutex_);
rbe_->RemoveStream(ssrc);
}
DataRate ReceiveSideCongestionController::LatestReceiveSideEstimate() const {
MutexLock lock(&mutex_);
return rbe_->LatestEstimate();
}
void ReceiveSideCongestionController::PickEstimator(
bool has_absolute_send_time) {
if (has_absolute_send_time) {
// If we see AST in header, switch RBE strategy immediately.
if (!using_absolute_send_time_) {
RTC_LOG(LS_INFO)
<< "WrappingBitrateEstimator: Switching to absolute send time RBE.";
using_absolute_send_time_ = true;
rbe_ = std::make_unique<RemoteBitrateEstimatorAbsSendTime>(
env_, &remb_throttler_);
}
packets_since_absolute_send_time_ = 0;
} else {
// When we don't see AST, wait for a few packets before going back to TOF.
if (using_absolute_send_time_) {
++packets_since_absolute_send_time_;
if (packets_since_absolute_send_time_ >= kTimeOffsetSwitchThreshold) {
RTC_LOG(LS_INFO)
<< "WrappingBitrateEstimator: Switching to transmission "
"time offset RBE.";
using_absolute_send_time_ = false;
rbe_ = std::make_unique<RemoteBitrateEstimatorSingleStream>(
env_, &remb_throttler_);
}
}
}
}
ReceiveSideCongestionController::ReceiveSideCongestionController(
const Environment& env,
TransportSequenceNumberFeedbackGenenerator::RtcpSender feedback_sender,
RembThrottler::RembSender remb_sender,
absl::Nullable<NetworkStateEstimator*> network_state_estimator)
: env_(env),
remb_throttler_(std::move(remb_sender), &env_.clock()),
transport_sequence_number_feedback_generator_(feedback_sender,
network_state_estimator),
congestion_control_feedback_generator_(env, feedback_sender),
rbe_(std::make_unique<RemoteBitrateEstimatorSingleStream>(
env_, &remb_throttler_)),
using_absolute_send_time_(false),
packets_since_absolute_send_time_(0) {
FieldTrialParameter<bool> force_send_rfc8888_feedback("force_send", false);
ParseFieldTrial(
{&force_send_rfc8888_feedback},
env.field_trials().Lookup("WebRTC-RFC8888CongestionControlFeedback"));
if (force_send_rfc8888_feedback) {
EnablSendCongestionControlFeedbackAccordingToRfc8888();
}
}
void ReceiveSideCongestionController::
EnablSendCongestionControlFeedbackAccordingToRfc8888() {
RTC_DCHECK_RUN_ON(&sequence_checker_);
send_rfc8888_congestion_feedback_ = true;
}
void ReceiveSideCongestionController::OnReceivedPacket(
const RtpPacketReceived& packet, MediaType media_type) {
if (send_rfc8888_congestion_feedback_) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
congestion_control_feedback_generator_.OnReceivedPacket(packet);
return;
}
bool has_transport_sequence_number =
packet.HasExtension<TransportSequenceNumber>() ||
packet.HasExtension<TransportSequenceNumberV2>();
if (media_type == MediaType::AUDIO && !has_transport_sequence_number) {
// For audio, we only support send side BWE.
return;
}
if (has_transport_sequence_number) {
// Send-side BWE.
transport_sequence_number_feedback_generator_.OnReceivedPacket(packet);
} else {
// Receive-side BWE.
MutexLock lock(&mutex_);
PickEstimator(packet.HasExtension<AbsoluteSendTime>());
rbe_->IncomingPacket(packet);
}
}
void ReceiveSideCongestionController::OnBitrateChanged(int bitrate_bps) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
DataRate send_bandwidth_estimate = DataRate::BitsPerSec(bitrate_bps);
transport_sequence_number_feedback_generator_.OnSendBandwidthEstimateChanged(
send_bandwidth_estimate);
congestion_control_feedback_generator_.OnSendBandwidthEstimateChanged(
send_bandwidth_estimate);
}
TimeDelta ReceiveSideCongestionController::MaybeProcess() {
Timestamp now = env_.clock().CurrentTime();
if (send_rfc8888_congestion_feedback_) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
return congestion_control_feedback_generator_.Process(now);
}
mutex_.Lock();
TimeDelta time_until_rbe = rbe_->Process();
mutex_.Unlock();
TimeDelta time_until_rep =
transport_sequence_number_feedback_generator_.Process(now);
TimeDelta time_until = std::min(time_until_rbe, time_until_rep);
return std::max(time_until, TimeDelta::Zero());
}
void ReceiveSideCongestionController::SetMaxDesiredReceiveBitrate(
DataRate bitrate) {
remb_throttler_.SetMaxDesiredReceiveBitrate(bitrate);
}
void ReceiveSideCongestionController::SetTransportOverhead(
DataSize overhead_per_packet) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
transport_sequence_number_feedback_generator_.SetTransportOverhead(
overhead_per_packet);
congestion_control_feedback_generator_.SetTransportOverhead(
overhead_per_packet);
}

View File

@@ -0,0 +1,69 @@
/*
* @Author: DI JUNKUN
* @Date: 2024-12-12
* Copyright (c) 2024 by DI JUNKUN, All Rights Reserved.
*/
#ifndef _RECEIVE_SIDE_CONGESTION_CONTROLLER_H_
#define _RECEIVE_SIDE_CONGESTION_CONTROLLER_H_
class ReceiveSideCongestionController {
public:
ReceiveSideCongestionController();
~ReceiveSideCongestionController() override = default;
public:
void EnablSendCongestionControlFeedbackAccordingToRfc8888();
void OnReceivedPacket(const RtpPacketReceived& packet, MediaType media_type);
// Implements CallStatsObserver.
void OnRttUpdate(int64_t avg_rtt_ms, int64_t max_rtt_ms) override;
// This is send bitrate, used to control the rate of feedback messages.
void OnBitrateChanged(int bitrate_bps);
// Ensures the remote party is notified of the receive bitrate no larger than
// `bitrate` using RTCP REMB.
void SetMaxDesiredReceiveBitrate(DataRate bitrate);
void SetTransportOverhead(DataSize overhead_per_packet);
// Returns latest receive side bandwidth estimation.
// Returns zero if receive side bandwidth estimation is unavailable.
DataRate LatestReceiveSideEstimate() const;
// Removes stream from receive side bandwidth estimation.
// Noop if receive side bwe is not used or stream doesn't participate in it.
void RemoveStream(uint32_t ssrc);
// Runs periodic tasks if it is time to run them, returns time until next
// call to `MaybeProcess` should be non idle.
TimeDelta MaybeProcess();
private:
void PickEstimator(bool has_absolute_send_time)
RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
const Environment env_;
RembThrottler remb_throttler_;
// TODO: bugs.webrtc.org/42224904 - Use sequence checker for all usage of
// ReceiveSideCongestionController. At the time of
// writing OnReceivedPacket and MaybeProcess can unfortunately be called on an
// arbitrary thread by external projects.
SequenceChecker sequence_checker_;
bool send_rfc8888_congestion_feedback_ = false;
TransportSequenceNumberFeedbackGenenerator
transport_sequence_number_feedback_generator_;
CongestionControlFeedbackGenerator congestion_control_feedback_generator_
RTC_GUARDED_BY(sequence_checker_);
mutable Mutex mutex_;
std::unique_ptr<RemoteBitrateEstimator> rbe_ RTC_GUARDED_BY(mutex_);
bool using_absolute_send_time_ RTC_GUARDED_BY(mutex_);
uint32_t packets_since_absolute_send_time_ RTC_GUARDED_BY(mutex_);
};
#endif