[feat] update qos module

This commit is contained in:
dijunkun
2025-01-07 17:31:14 +08:00
parent 601fedfd76
commit 7a84b25b5c
14 changed files with 224 additions and 83 deletions

View File

@@ -158,25 +158,28 @@ bool CongestionControlFeedback::Create(uint8_t* buffer, size_t* position,
*position += 2;
// num_reports
uint16_t num_reports = packets.size();
RTC_DCHECK_EQ(static_cast<uint16_t>(
packets[packets.size() - 1].sequence_number -
packets[0].sequence_number + 1),
packets.size())
<< "Expected continous rtp sequence numbers.";
if (static_cast<uint16_t>(packets[packets.size() - 1].sequence_number -
packets[0].sequence_number + 1) !=
packets.size()) {
LOG_FATAL("Expected continous rtp sequence numbers");
return false;
}
// Each report block MUST NOT include more than 16384 packet metric
// blocks (i.e., it MUST NOT report on more than one quarter of the
// sequence number space in a single report).
if (num_reports > 16384) {
LOG_FATAL("Unexpected number of reports: {}", num_reports);
return;
return false;
}
ByteWriter<uint16_t>::WriteBigEndian(&buffer[*position], num_reports);
*position += 2;
for (const PacketInfo& packet : packets) {
bool received = packet.arrival_time_offset.IsFinite();
bool received =
(packet.arrival_time_offset != std::numeric_limits<int64_t>::min()) &&
(packet.arrival_time_offset != std::numeric_limits<int64_t>::max());
uint16_t packet_info = 0;
if (received) {
packet_info = 0x8000 | To2BitEcn(packet.ecn) |
@@ -213,7 +216,9 @@ bool CongestionControlFeedback::Create(uint8_t* buffer, size_t* position,
report_timestamp_compact_ntp_);
*position += 4;
RTC_DCHECK_EQ(*position, position_end);
if (*position != position_end) {
return false;
}
return true;
}

View File

@@ -12,6 +12,7 @@
#include <limits>
#include <vector>
#include "array_view.h"
#include "rtp_feedback.h"
// L4S Explicit Congestion Notification (ECN) .
@@ -45,7 +46,7 @@ class CongestionControlFeedback : public RtpFeedback {
// Time offset from report timestamp. Minus infinity if the packet has not
// been received.
int64_t arrival_time_offset = std::numeric_limits<int64_t>::min();
rtc::EcnMarking ecn = rtc::EcnMarking::kNotEct;
EcnMarking ecn = EcnMarking::kNotEct;
};
static constexpr uint8_t kFeedbackMessageType = 11;
@@ -59,7 +60,7 @@ class CongestionControlFeedback : public RtpFeedback {
bool Parse(const CommonHeader& packet);
rtc::ArrayView<const PacketInfo> packets() const { return packets_; }
ArrayView<const PacketInfo> packets() const { return packets_; }
uint32_t report_timestamp_compact_ntp() const {
return report_timestamp_compact_ntp_;

View File

@@ -7,8 +7,32 @@
#ifndef _CONGESTION_CONTROL_FEEDBACK_GENERATOR_H_
#define _CONGESTION_CONTROL_FEEDBACK_GENERATOR_H_
#include <optional>
#include <vector>
#include "rtcp_packet.h"
#include "rtp_packet_received.h"
class RtpTransportFeedbackGenerator {
public:
// Function intented to be used for sending RTCP messages generated by an
// implementation of this class.
using RtcpSender =
std::function<void(std::vector<std::unique_ptr<RtcpPacket>> packets)>;
virtual ~RtpTransportFeedbackGenerator() = default;
virtual void OnReceivedPacket(const RtpPacketReceived& packet) = 0;
// Sends periodic feedback if it is time to send it.
// Returns time until next call to Process should be made.
virtual int64_t Process(int64_t now) = 0;
virtual void OnSendBandwidthEstimateChanged(int64_t estimate) = 0;
// Overhead from transport layers below RTP. Ie, IP, SRTP.
virtual void SetTransportOverhead(DataSize overhead_per_packet) = 0;
};
class CongestionControlFeedbackGenerator
: public RtpTransportFeedbackGenerator {
public:
@@ -18,7 +42,7 @@ class CongestionControlFeedbackGenerator
void OnReceivedPacket(const RtpPacketReceived& packet) override;
void OnSendBandwidthEstimateChanged(DataRate estimate) override;
void OnSendBandwidthEstimateChanged(int64_t estimate) override;
int64_t Process(int64_t now_ms) override;

View File

@@ -60,7 +60,7 @@ void CongestionControlFeedbackTracker::AddPacketsToFeedback(
return;
}
rtc::EcnMarking ecn = rtc::EcnMarking::kNotEct;
EcnMarking ecn = EcnMarking::kNotEct;
TimeDelta arrival_time_offset = TimeDelta::MinusInfinity();
if (sequence_number == packet_it->unwrapped_sequence_number) {
@@ -75,8 +75,8 @@ void CongestionControlFeedbackTracker::AddPacketsToFeedback(
// the copies of the duplicated packet are ECN-CE marked, then an ECN-CE
// mark MUST be reported for that packet; otherwise, the ECN mark of the
// first copy to arrive is reported.
if (packet_it->ecn == rtc::EcnMarking::kCe) {
ecn = rtc::EcnMarking::kCe;
if (packet_it->ecn == EcnMarking::kCe) {
ecn = EcnMarking::kCe;
}
LOG_WARN("Received duplicate packet ssrc: {} seq: {} ecn: {}", ssrc,
static_cast<uint16_t>(sequence_number), static_cast<int>(ecn));

View File

@@ -10,6 +10,7 @@
#include <optional>
#include <vector>
#include "congestion_control_feedback.h"
#include "rtp_packet_received.h"
class CongestionControlFeedbackTracker {
@@ -31,7 +32,7 @@ class CongestionControlFeedbackTracker {
uint32_t ssrc;
int64_t unwrapped_sequence_number = 0;
int64_t arrival_time;
rtc::EcnMarking ecn = rtc::EcnMarking::kNotEct;
EcnMarking ecn = EcnMarking::kNotEct;
};
std::optional<int64_t> last_sequence_number_in_feedback_;

View File

@@ -20,17 +20,17 @@ static const uint32_t kTimeOffsetSwitchThreshold = 30;
void ReceiveSideCongestionController::OnRttUpdate(int64_t avg_rtt_ms,
int64_t max_rtt_ms) {
MutexLock lock(&mutex_);
std::lock_guard<std::mutex> guard(mutex_);
rbe_->OnRttUpdate(avg_rtt_ms, max_rtt_ms);
}
void ReceiveSideCongestionController::RemoveStream(uint32_t ssrc) {
MutexLock lock(&mutex_);
std::lock_guard<std::mutex> guard(mutex_);
rbe_->RemoveStream(ssrc);
}
DataRate ReceiveSideCongestionController::LatestReceiveSideEstimate() const {
MutexLock lock(&mutex_);
int64_t ReceiveSideCongestionController::LatestReceiveSideEstimate() const {
std::lock_guard<std::mutex> guard(mutex_);
return rbe_->LatestEstimate();
}
@@ -42,8 +42,8 @@ void ReceiveSideCongestionController::PickEstimator(
RTC_LOG(LS_INFO)
<< "WrappingBitrateEstimator: Switching to absolute send time RBE.";
using_absolute_send_time_ = true;
rbe_ = std::make_unique<RemoteBitrateEstimatorAbsSendTime>(
env_, &remb_throttler_);
// rbe_ = std::make_unique<RemoteBitrateEstimatorAbsSendTime>(
// env_, &remb_throttler_);
}
packets_since_absolute_send_time_ = 0;
} else {
@@ -55,8 +55,8 @@ void ReceiveSideCongestionController::PickEstimator(
<< "WrappingBitrateEstimator: Switching to transmission "
"time offset RBE.";
using_absolute_send_time_ = false;
rbe_ = std::make_unique<RemoteBitrateEstimatorSingleStream>(
env_, &remb_throttler_);
// rbe_ = std::make_unique<RemoteBitrateEstimatorSingleStream>(
// env_, &remb_throttler_);
}
}
}
@@ -68,12 +68,10 @@ ReceiveSideCongestionController::ReceiveSideCongestionController(
RembThrottler::RembSender remb_sender,
absl::Nullable<NetworkStateEstimator*> network_state_estimator)
: env_(env),
remb_throttler_(std::move(remb_sender), &env_.clock()),
transport_sequence_number_feedback_generator_(feedback_sender,
network_state_estimator),
// remb_throttler_(std::move(remb_sender), &env_.clock()),,
congestion_control_feedback_generator_(env, feedback_sender),
rbe_(std::make_unique<RemoteBitrateEstimatorSingleStream>(
env_, &remb_throttler_)),
// rbe_(std::make_unique<RemoteBitrateEstimatorSingleStream>(
// env_, &remb_throttler_)),
using_absolute_send_time_(false),
packets_since_absolute_send_time_(0) {
FieldTrialParameter<bool> force_send_rfc8888_feedback("force_send", false);
@@ -87,41 +85,22 @@ ReceiveSideCongestionController::ReceiveSideCongestionController(
void ReceiveSideCongestionController::
EnablSendCongestionControlFeedbackAccordingToRfc8888() {
RTC_DCHECK_RUN_ON(&sequence_checker_);
// RTC_DCHECK_RUN_ON(&sequence_checker_);
send_rfc8888_congestion_feedback_ = true;
}
void ReceiveSideCongestionController::OnReceivedPacket(
const RtpPacketReceived& packet, MediaType media_type) {
if (send_rfc8888_congestion_feedback_) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
// RTC_DCHECK_RUN_ON(&sequence_checker_);
congestion_control_feedback_generator_.OnReceivedPacket(packet);
return;
}
bool has_transport_sequence_number =
packet.HasExtension<TransportSequenceNumber>() ||
packet.HasExtension<TransportSequenceNumberV2>();
if (media_type == MediaType::AUDIO && !has_transport_sequence_number) {
// For audio, we only support send side BWE.
return;
}
if (has_transport_sequence_number) {
// Send-side BWE.
transport_sequence_number_feedback_generator_.OnReceivedPacket(packet);
} else {
// Receive-side BWE.
MutexLock lock(&mutex_);
PickEstimator(packet.HasExtension<AbsoluteSendTime>());
rbe_->IncomingPacket(packet);
}
}
void ReceiveSideCongestionController::OnBitrateChanged(int bitrate_bps) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
DataRate send_bandwidth_estimate = DataRate::BitsPerSec(bitrate_bps);
transport_sequence_number_feedback_generator_.OnSendBandwidthEstimateChanged(
send_bandwidth_estimate);
// RTC_DCHECK_RUN_ON(&sequence_checker_);
int64_t send_bandwidth_estimate = int64_t::BitsPerSec(bitrate_bps);
congestion_control_feedback_generator_.OnSendBandwidthEstimateChanged(
send_bandwidth_estimate);
}
@@ -129,28 +108,19 @@ void ReceiveSideCongestionController::OnBitrateChanged(int bitrate_bps) {
int64_t ReceiveSideCongestionController::MaybeProcess() {
int64_t now = env_.clock().CurrentTime();
if (send_rfc8888_congestion_feedback_) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
// RTC_DCHECK_RUN_ON(&sequence_checker_);
return congestion_control_feedback_generator_.Process(now);
}
mutex_.Lock();
TimeDelta time_until_rbe = rbe_->Process();
mutex_.Unlock();
TimeDelta time_until_rep =
transport_sequence_number_feedback_generator_.Process(now);
TimeDelta time_until = std::min(time_until_rbe, time_until_rep);
return std::max(time_until, TimeDelta::Zero());
}
void ReceiveSideCongestionController::SetMaxDesiredReceiveBitrate(
DataRate bitrate) {
remb_throttler_.SetMaxDesiredReceiveBitrate(bitrate);
int64_t bitrate) {
// remb_throttler_.SetMaxDesiredReceiveBitrate(bitrate);
}
void ReceiveSideCongestionController::SetTransportOverhead(
DataSize overhead_per_packet) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
transport_sequence_number_feedback_generator_.SetTransportOverhead(
overhead_per_packet);
int64_t overhead_per_packet) {
// RTC_DCHECK_RUN_ON(&sequence_checker_);
congestion_control_feedback_generator_.SetTransportOverhead(
overhead_per_packet);
}

View File

@@ -7,6 +7,11 @@
#ifndef _RECEIVE_SIDE_CONGESTION_CONTROLLER_H_
#define _RECEIVE_SIDE_CONGESTION_CONTROLLER_H_
#include <mutex>
#include "congestion_control_feedback_generator.h"
#include "rtp_packet_received.h"
class ReceiveSideCongestionController {
public:
enum MediaType { VIDEO, AUDIO, DATA };
@@ -26,13 +31,13 @@ class ReceiveSideCongestionController {
// Ensures the remote party is notified of the receive bitrate no larger than
// `bitrate` using RTCP REMB.
void SetMaxDesiredReceiveBitrate(DataRate bitrate);
void SetMaxDesiredReceiveBitrate(int64_t bitrate);
void SetTransportOverhead(DataSize overhead_per_packet);
void SetTransportOverhead(int64_t overhead_per_packet);
// Returns latest receive side bandwidth estimation.
// Returns zero if receive side bandwidth estimation is unavailable.
DataRate LatestReceiveSideEstimate() const;
int64_t LatestReceiveSideEstimate() const;
// Removes stream from receive side bandwidth estimation.
// Noop if receive side bwe is not used or stream doesn't participate in it.
@@ -45,23 +50,20 @@ class ReceiveSideCongestionController {
private:
void PickEstimator(bool has_absolute_send_time);
RembThrottler remb_throttler_;
// RembThrottler remb_throttler_;
// TODO: bugs.webrtc.org/42224904 - Use sequence checker for all usage of
// ReceiveSideCongestionController. At the time of
// writing OnReceivedPacket and MaybeProcess can unfortunately be called on an
// arbitrary thread by external projects.
SequenceChecker sequence_checker_;
// SequenceChecker sequence_checker_;
bool send_rfc8888_congestion_feedback_ = false;
TransportSequenceNumberFeedbackGenenerator
transport_sequence_number_feedback_generator_;
CongestionControlFeedbackGenerator congestion_control_feedback_generator_
RTC_GUARDED_BY(sequence_checker_);
CongestionControlFeedbackGenerator congestion_control_feedback_generator_;
mutable Mutex mutex_;
std::unique_ptr<RemoteBitrateEstimator> rbe_ RTC_GUARDED_BY(mutex_);
bool using_absolute_send_time_ RTC_GUARDED_BY(mutex_);
std::mutex mutex_;
std::unique_ptr<RemoteBitrateEstimator> rbe_;
bool using_absolute_send_time_;
uint32_t packets_since_absolute_send_time_ RTC_GUARDED_BY(mutex_);
};

View File

@@ -0,0 +1,84 @@
/*
* Copyright (c) 2016 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "common_header.h"
#include "byte_io.h"
#include "log.h"
// 0 1 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// 0 |V=2|P| C/F |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// 1 | Packet Type |
// ----------------+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// 2 | length |
// --------------------------------+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//
// Common header for all RTCP packets, 4 octets.
bool CommonHeader::Parse(const uint8_t* buffer, size_t size_bytes) {
const uint8_t kVersion = 2;
if (size_bytes < kHeaderSizeBytes) {
LOG_WARN(
"Too little data ({}) remaining in buffer to parse RTCP header (4 "
"bytes)",
size_bytes);
return false;
}
uint8_t version = buffer[0] >> 6;
if (version != kVersion) {
LOG_WARN("Invalid RTCP header: Version must be {} but was {}",
static_cast<int>(kVersion), static_cast<int>(version));
return false;
}
bool has_padding = (buffer[0] & 0x20) != 0;
count_or_format_ = buffer[0] & 0x1F;
packet_type_ = buffer[1];
payload_size_ = ByteReader<uint16_t>::ReadBigEndian(&buffer[2]) * 4;
payload_ = buffer + kHeaderSizeBytes;
padding_size_ = 0;
if (size_bytes < kHeaderSizeBytes + payload_size_) {
LOG_WARN(
"Buffer too small ({}) to fit an RtcpPacket with a header and ({} "
"bytes)",
size_bytes, payload_size_);
return false;
}
if (has_padding) {
if (payload_size_ == 0) {
LOG_WARN(
"Invalid RTCP header: Padding bit set but 0 payload size "
"specified");
return false;
}
padding_size_ = payload_[payload_size_ - 1];
if (padding_size_ == 0) {
LOG_WARN(
"Invalid RTCP header: Padding bit set but 0 padding size specified");
return false;
}
if (padding_size_ > payload_size_) {
LOG_WARN(
"Invalid RTCP header: Too many padding bytes ({}) for a packet "
"payload size of ({}) bytes",
padding_size_, payload_size_);
return false;
}
payload_size_ -= padding_size_;
}
return true;
}

View File

@@ -0,0 +1,48 @@
/*
* Copyright (c) 2016 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_RTP_RTCP_SOURCE_RTCP_PACKET_COMMON_HEADER_H_
#define MODULES_RTP_RTCP_SOURCE_RTCP_PACKET_COMMON_HEADER_H_
#include <stddef.h>
#include <stdint.h>
class CommonHeader {
public:
static constexpr size_t kHeaderSizeBytes = 4;
CommonHeader() {}
CommonHeader(const CommonHeader&) = default;
CommonHeader& operator=(const CommonHeader&) = default;
bool Parse(const uint8_t* buffer, size_t size_bytes);
uint8_t type() const { return packet_type_; }
// Depending on packet type same header field can be used either as count or
// as feedback message type (fmt). Caller expected to know how it is used.
uint8_t fmt() const { return count_or_format_; }
uint8_t count() const { return count_or_format_; }
size_t payload_size_bytes() const { return payload_size_; }
const uint8_t* payload() const { return payload_; }
size_t packet_size() const {
return kHeaderSizeBytes + payload_size_ + padding_size_;
}
// Returns pointer to the next RTCP packet in compound packet.
const uint8_t* NextPacket() const {
return payload_ + payload_size_ + padding_size_;
}
private:
uint8_t packet_type_ = 0;
uint8_t count_or_format_ = 0;
uint8_t padding_size_ = 0;
uint32_t payload_size_ = 0;
const uint8_t* payload_ = nullptr;
};
#endif // MODULES_RTP_RTCP_SOURCE_RTCP_PACKET_COMMON_HEADER_H_

View File

@@ -10,6 +10,7 @@
#include <stddef.h>
#include <stdint.h>
#include "common_header.h"
#include "rtcp_packet.h"
// RTPFB: Transport layer feedback message.

View File

@@ -72,7 +72,7 @@ void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) {
rtcp_rr.Encode();
// SendRtcpRR(rtcp_rr);
SendRtcpRR(rtcp_rr);
}
if (rtp_packet.PayloadType() == RtpPacket::PAYLOAD_TYPE::AV1) {
ProcessAv1RtpPacket(rtp_packet);
@@ -371,7 +371,7 @@ void RtpVideoReceiver::RtcpThread() {
if (rtcp_stop_) break;
send_rtcp_rr_triggered_ = false;
} else {
LOG_ERROR("Send video tcc");
// LOG_ERROR("Send video tcc");
auto now = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_send_rtcp_rr_ts_)

View File

@@ -8,6 +8,7 @@
#include "fec_decoder.h"
#include "io_statistics.h"
#include "receive_side_congestion_controller.h"
#include "ringbuffer.h"
#include "rtcp_receiver_report.h"
#include "rtp_codec.h"
@@ -83,6 +84,9 @@ class RtpVideoReceiver : public ThreadBase {
std::atomic<bool> rtcp_stop_ = false;
int rtcp_rr_interval_ms_ = 5000;
int rtcp_tcc_interval_ms_ = 200;
private:
ReceiveSideCongestionController congestion_controller_;
};
#endif

View File

@@ -15,7 +15,7 @@ class RtpPacketReceived : public RtpPacket {
public:
RtpPacketReceived();
explicit RtpPacketReceived(
int64_t arrival_time = std::numeric_limits<int64_t>::min());
int64_t arrival_time = (std::numeric_limits<int64_t>::min)());
RtpPacketReceived(const RtpPacketReceived& packet);
RtpPacketReceived(RtpPacketReceived&& packet);
@@ -25,7 +25,7 @@ class RtpPacketReceived : public RtpPacket {
~RtpPacketReceived();
private:
int64_t arrival_time_ = std::numeric_limits<int64_t>::min();
int64_t arrival_time_ = (std::numeric_limits<int64_t>::min)();
};
#endif