From 1f3c93c77acd54f163e835a0cc1fb8fa0edc4e15 Mon Sep 17 00:00:00 2001 From: dijunkun Date: Mon, 10 Feb 2025 14:23:07 +0800 Subject: [PATCH] [feat] add robust throughput estimator --- ...cknowledged_bitrate_estimator_interface.cc | 44 +++- ...acknowledged_bitrate_estimator_interface.h | 38 ++++ src/qos/congestion_control.cpp | 8 + src/qos/robust_throughput_estimator.cc | 190 ++++++++++++++++++ src/qos/robust_throughput_estimator.h | 50 +++++ 5 files changed, 329 insertions(+), 1 deletion(-) create mode 100644 src/qos/robust_throughput_estimator.cc create mode 100644 src/qos/robust_throughput_estimator.h diff --git a/src/qos/acknowledged_bitrate_estimator_interface.cc b/src/qos/acknowledged_bitrate_estimator_interface.cc index 802456e..3a4a901 100644 --- a/src/qos/acknowledged_bitrate_estimator_interface.cc +++ b/src/qos/acknowledged_bitrate_estimator_interface.cc @@ -16,15 +16,57 @@ #include "acknowledged_bitrate_estimator.h" #include "api/units/time_delta.h" #include "log.h" +#include "robust_throughput_estimator.h" namespace webrtc { +constexpr char RobustThroughputEstimatorSettings::kKey[]; + +RobustThroughputEstimatorSettings::RobustThroughputEstimatorSettings() { + if (window_packets < 10 || 1000 < window_packets) { + LOG_WARN("Window size must be between 10 and 1000 packets"); + window_packets = 20; + } + if (max_window_packets < 10 || 1000 < max_window_packets) { + LOG_WARN("Max window size must be between 10 and 1000 packets"); + max_window_packets = 500; + } + max_window_packets = std::max(max_window_packets, window_packets); + + if (required_packets < 10 || 1000 < required_packets) { + LOG_WARN( + "Required number of initial packets must be between 10 and 1000 " + "packets"); + required_packets = 10; + } + required_packets = std::min(required_packets, window_packets); + + if (min_window_duration < TimeDelta::Millis(100) || + TimeDelta::Millis(3000) < min_window_duration) { + LOG_WARN("Window duration must be between 100 and 3000 ms"); + min_window_duration = TimeDelta::Millis(750); + } + if (max_window_duration < TimeDelta::Seconds(1) || + TimeDelta::Seconds(15) < max_window_duration) { + LOG_WARN("Max window duration must be between 1 and 15 s"); + max_window_duration = TimeDelta::Seconds(5); + } + min_window_duration = std::min(min_window_duration, max_window_duration); + + if (unacked_weight < 0.0 || 1.0 < unacked_weight) { + LOG_WARN("Weight for prior unacked size must be between 0 and 1."); + unacked_weight = 1.0; + } +} + AcknowledgedBitrateEstimatorInterface:: ~AcknowledgedBitrateEstimatorInterface() {} std::unique_ptr AcknowledgedBitrateEstimatorInterface::Create() { - return std::make_unique(); + // return std::make_unique(); + RobustThroughputEstimatorSettings settings; + return std::make_unique(settings); } } // namespace webrtc diff --git a/src/qos/acknowledged_bitrate_estimator_interface.h b/src/qos/acknowledged_bitrate_estimator_interface.h index dd9eb4e..c847d7a 100644 --- a/src/qos/acknowledged_bitrate_estimator_interface.h +++ b/src/qos/acknowledged_bitrate_estimator_interface.h @@ -22,6 +22,44 @@ namespace webrtc { +struct RobustThroughputEstimatorSettings { + static constexpr char kKey[] = "WebRTC-Bwe-RobustThroughputEstimatorSettings"; + + RobustThroughputEstimatorSettings(); + + // Set `enabled` to true to use the RobustThroughputEstimator, false to use + // the AcknowledgedBitrateEstimator. + bool enabled = true; + + // The estimator keeps the smallest window containing at least + // `window_packets` and at least the packets received during the last + // `min_window_duration` milliseconds. + // (This means that it may store more than `window_packets` at high bitrates, + // and a longer duration than `min_window_duration` at low bitrates.) + // However, if will never store more than kMaxPackets (for performance + // reasons), and never longer than max_window_duration (to avoid very old + // packets influencing the estimate for example when sending is paused). + unsigned window_packets = 20; + unsigned max_window_packets = 500; + TimeDelta min_window_duration = TimeDelta::Seconds(1); + TimeDelta max_window_duration = TimeDelta::Seconds(5); + + // The estimator window requires at least `required_packets` packets + // to produce an estimate. + unsigned required_packets = 10; + + // If audio packets aren't included in allocation (i.e. the + // estimated available bandwidth is divided only among the video + // streams), then `unacked_weight` should be set to 0. + // If audio packets are included in allocation, but not in bandwidth + // estimation (i.e. they don't have transport-wide sequence numbers, + // but we nevertheless divide the estimated available bandwidth among + // both audio and video streams), then `unacked_weight` should be set to 1. + // If all packets have transport-wide sequence numbers, then the value + // of `unacked_weight` doesn't matter. + double unacked_weight = 1.0; +}; + class AcknowledgedBitrateEstimatorInterface { public: static std::unique_ptr Create(); diff --git a/src/qos/congestion_control.cpp b/src/qos/congestion_control.cpp index 9978aae..51c6de2 100644 --- a/src/qos/congestion_control.cpp +++ b/src/qos/congestion_control.cpp @@ -133,9 +133,17 @@ NetworkControlUpdate CongestionControl::OnTransportPacketsFeedback( } previously_in_alr_ = alr_start_time.has_value(); + int count = 0; + for (auto r : report.SortedByReceiveTime()) { + count++; + LOG_WARN("{} packet.sent_packet.size: {}", count, + ToString(r.sent_packet.size)); + } + acknowledged_bitrate_estimator_->IncomingPacketFeedbackVector( report.SortedByReceiveTime()); auto acknowledged_bitrate = acknowledged_bitrate_estimator_->bitrate(); + LOG_WARN("acknowledged_bitrate:{}", acknowledged_bitrate->kbps()); // TODO: fix acknowledged_bitrate // acknowledged_bitrate = DataRate::KilobitsPerSec(1000); bandwidth_estimation_->SetAcknowledgedRate(acknowledged_bitrate, diff --git a/src/qos/robust_throughput_estimator.cc b/src/qos/robust_throughput_estimator.cc new file mode 100644 index 0000000..9228876 --- /dev/null +++ b/src/qos/robust_throughput_estimator.cc @@ -0,0 +1,190 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "robust_throughput_estimator.h" + +#include + +#include +#include +#include +#include + +#include "acknowledged_bitrate_estimator_interface.h" +#include "api/transport/network_types.h" +#include "api/units/data_rate.h" +#include "api/units/data_size.h" +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" +#include "log.h" + +namespace webrtc { + +RobustThroughputEstimator::RobustThroughputEstimator( + const RobustThroughputEstimatorSettings& settings) + : settings_(settings), + latest_discarded_send_time_(Timestamp::MinusInfinity()) {} + +RobustThroughputEstimator::~RobustThroughputEstimator() {} + +bool RobustThroughputEstimator::FirstPacketOutsideWindow() { + if (window_.empty()) return false; + if (window_.size() > settings_.max_window_packets) return true; + TimeDelta current_window_duration = + window_.back().receive_time - window_.front().receive_time; + if (current_window_duration > settings_.max_window_duration) return true; + if (window_.size() > settings_.window_packets && + current_window_duration > settings_.min_window_duration) { + return true; + } + return false; +} + +void RobustThroughputEstimator::IncomingPacketFeedbackVector( + const std::vector& packet_feedback_vector) { + for (const auto& packet : packet_feedback_vector) { + // Ignore packets without valid send or receive times. + // (This should not happen in production since lost packets are filtered + // out before passing the feedback vector to the throughput estimator. + // However, explicitly handling this case makes the estimator more robust + // and avoids a hard-to-detect bad state.) + if (packet.receive_time.IsInfinite() || + packet.sent_packet.send_time.IsInfinite()) { + continue; + } + + // Insert the new packet. + window_.push_back(packet); + window_.back().sent_packet.prior_unacked_data = + window_.back().sent_packet.prior_unacked_data * + settings_.unacked_weight; + // In most cases, receive timestamps should already be in order, but in the + // rare case where feedback packets have been reordered, we do some swaps to + // ensure that the window is sorted. + for (size_t i = window_.size() - 1; + i > 0 && window_[i].receive_time < window_[i - 1].receive_time; i--) { + std::swap(window_[i], window_[i - 1]); + } + constexpr TimeDelta kMaxReorderingTime = TimeDelta::Seconds(1); + const TimeDelta receive_delta = + (window_.back().receive_time - packet.receive_time); + if (receive_delta > kMaxReorderingTime) { + LOG_WARN("Severe packet re-ordering or timestamps offset changed: {}", + ToString(receive_delta)); + window_.clear(); + latest_discarded_send_time_ = Timestamp::MinusInfinity(); + } + } + + // Remove old packets. + while (FirstPacketOutsideWindow()) { + latest_discarded_send_time_ = std::max( + latest_discarded_send_time_, window_.front().sent_packet.send_time); + window_.pop_front(); + } +} + +std::optional RobustThroughputEstimator::bitrate() const { + if (window_.empty() || window_.size() < settings_.required_packets) + return std::nullopt; + + TimeDelta largest_recv_gap(TimeDelta::Zero()); + TimeDelta second_largest_recv_gap(TimeDelta::Zero()); + for (size_t i = 1; i < window_.size(); i++) { + // Find receive time gaps. + TimeDelta gap = window_[i].receive_time - window_[i - 1].receive_time; + if (gap > largest_recv_gap) { + second_largest_recv_gap = largest_recv_gap; + largest_recv_gap = gap; + } else if (gap > second_largest_recv_gap) { + second_largest_recv_gap = gap; + } + } + + Timestamp first_send_time = Timestamp::PlusInfinity(); + Timestamp last_send_time = Timestamp::MinusInfinity(); + Timestamp first_recv_time = Timestamp::PlusInfinity(); + Timestamp last_recv_time = Timestamp::MinusInfinity(); + DataSize recv_size = DataSize::Bytes(0); + DataSize send_size = DataSize::Bytes(0); + DataSize first_recv_size = DataSize::Bytes(0); + DataSize last_send_size = DataSize::Bytes(0); + size_t num_sent_packets_in_window = 0; + for (const auto& packet : window_) { + if (packet.receive_time < first_recv_time) { + first_recv_time = packet.receive_time; + first_recv_size = + packet.sent_packet.size + packet.sent_packet.prior_unacked_data; + } + last_recv_time = std::max(last_recv_time, packet.receive_time); + recv_size += packet.sent_packet.size; + recv_size += packet.sent_packet.prior_unacked_data; + + if (packet.sent_packet.send_time < latest_discarded_send_time_) { + // If we have dropped packets from the window that were sent after + // this packet, then this packet was reordered. Ignore it from + // the send rate computation (since the send time may be very far + // in the past, leading to underestimation of the send rate.) + // However, ignoring packets creates a risk that we end up without + // any packets left to compute a send rate. + continue; + } + if (packet.sent_packet.send_time > last_send_time) { + last_send_time = packet.sent_packet.send_time; + last_send_size = + packet.sent_packet.size + packet.sent_packet.prior_unacked_data; + } + first_send_time = std::min(first_send_time, packet.sent_packet.send_time); + + send_size += packet.sent_packet.size; + send_size += packet.sent_packet.prior_unacked_data; + ++num_sent_packets_in_window; + } + + // Suppose a packet of size S is sent every T milliseconds. + // A window of N packets would contain N*S bytes, but the time difference + // between the first and the last packet would only be (N-1)*T. Thus, we + // need to remove the size of one packet to get the correct rate of S/T. + // Which packet to remove (if the packets have varying sizes), + // depends on the network model. + // Suppose that 2 packets with sizes s1 and s2, are received at times t1 + // and t2, respectively. If the packets were transmitted back to back over + // a bottleneck with rate capacity r, then we'd expect t2 = t1 + r * s2. + // Thus, r = (t2-t1) / s2, so the size of the first packet doesn't affect + // the difference between t1 and t2. + // Analoguously, if the first packet is sent at time t1 and the sender + // paces the packets at rate r, then the second packet can be sent at time + // t2 = t1 + r * s1. Thus, the send rate estimate r = (t2-t1) / s1 doesn't + // depend on the size of the last packet. + recv_size -= first_recv_size; + send_size -= last_send_size; + + // Remove the largest gap by replacing it by the second largest gap. + // This is to ensure that spurious "delay spikes" (i.e. when the + // network stops transmitting packets for a short period, followed + // by a burst of delayed packets), don't cause the estimate to drop. + // This could cause an overestimation, which we guard against by + // never returning an estimate above the send rate. + TimeDelta recv_duration = (last_recv_time - first_recv_time) - + largest_recv_gap + second_largest_recv_gap; + recv_duration = std::max(recv_duration, TimeDelta::Millis(1)); + + if (num_sent_packets_in_window < settings_.required_packets) { + // Too few send times to calculate a reliable send rate. + return recv_size / recv_duration; + } + + TimeDelta send_duration = last_send_time - first_send_time; + send_duration = std::max(send_duration, TimeDelta::Millis(1)); + + return std::min(send_size / send_duration, recv_size / recv_duration); +} + +} // namespace webrtc diff --git a/src/qos/robust_throughput_estimator.h b/src/qos/robust_throughput_estimator.h new file mode 100644 index 0000000..dc61409 --- /dev/null +++ b/src/qos/robust_throughput_estimator.h @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_CONGESTION_CONTROLLER_GOOG_CC_ROBUST_THROUGHPUT_ESTIMATOR_H_ +#define MODULES_CONGESTION_CONTROLLER_GOOG_CC_ROBUST_THROUGHPUT_ESTIMATOR_H_ + +#include +#include +#include + +#include "acknowledged_bitrate_estimator_interface.h" +#include "api/transport/network_types.h" +#include "api/units/data_rate.h" +#include "api/units/timestamp.h" + +namespace webrtc { + +class RobustThroughputEstimator : public AcknowledgedBitrateEstimatorInterface { + public: + explicit RobustThroughputEstimator( + const RobustThroughputEstimatorSettings& settings); + ~RobustThroughputEstimator() override; + + void IncomingPacketFeedbackVector( + const std::vector& packet_feedback_vector) override; + + std::optional bitrate() const override; + + std::optional PeekRate() const override { return bitrate(); } + void SetAlr(bool /*in_alr*/) override {} + void SetAlrEndedTime(Timestamp /*alr_ended_time*/) override {} + + private: + bool FirstPacketOutsideWindow(); + + const RobustThroughputEstimatorSettings settings_; + std::deque window_; + Timestamp latest_discarded_send_time_ = Timestamp::MinusInfinity(); +}; + +} // namespace webrtc + +#endif // MODULES_CONGESTION_CONTROLLER_GOOG_CC_ROBUST_THROUGHPUT_ESTIMATOR_H_