[feat] add pacing controller module

This commit is contained in:
dijunkun
2025-03-12 18:18:19 +08:00
parent 2bf60a9c81
commit 23df1f3b60
15 changed files with 2320 additions and 9 deletions

View File

@@ -0,0 +1,65 @@
/*
* @Author: DI JUNKUN
* @Date: 2025-03-12
* Copyright (c) 2025 by DI JUNKUN, All Rights Reserved.
*/
#ifndef _INLINED_VECTOR_H_
#define _INLINED_VECTOR_H_
#include <array>
#include <initializer_list>
#include <iostream>
#include <vector>
template <typename T, size_t N>
class InlinedVector {
public:
InlinedVector() : size_(0), use_heap_(false) {}
void push_back(const T& value) {
if (!use_heap_ && size_ < N) {
stack_data_[size_] = value;
} else {
if (!use_heap_) {
heap_data_.reserve(N * 2);
for (size_t i = 0; i < size_; ++i) {
heap_data_.push_back(stack_data_[i]);
}
use_heap_ = true;
}
heap_data_.push_back(value);
}
++size_;
}
void assign(size_t n, const T& value) {
clear();
for (size_t i = 0; i < n; ++i) {
push_back(value);
}
}
size_t size() const { return size_; }
T& operator[](size_t index) {
return use_heap_ ? heap_data_[index] : stack_data_[index];
}
const T& operator[](size_t index) const {
return use_heap_ ? heap_data_[index] : stack_data_[index];
}
private:
void clear() {
size_ = 0;
use_heap_ = false;
heap_data_.clear();
}
size_t size_;
bool use_heap_;
std::array<T, N> stack_data_;
std::vector<T> heap_data_;
};
#endif // _INLINED_VECTOR_H_

192
src/qos/bitrate_prober.cc Normal file
View File

@@ -0,0 +1,192 @@
/*
* Copyright (c) 2014 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "bitrate_prober.h"
#include <algorithm>
#include <cstddef>
#include <optional>
#include "api/transport/network_types.h"
#include "api/units/data_rate.h"
#include "api/units/data_size.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "log.h"
namespace webrtc {
namespace {
constexpr TimeDelta kProbeClusterTimeout = TimeDelta::Seconds(5);
constexpr size_t kMaxPendingProbeClusters = 5;
} // namespace
BitrateProberConfig::BitrateProberConfig()
: max_probe_delay(TimeDelta::Millis(10)),
min_packet_size(DataSize::Bytes(200)) {}
BitrateProber::BitrateProber()
: probing_state_(ProbingState::kDisabled),
next_probe_time_(Timestamp::PlusInfinity()) {
SetEnabled(true);
}
void BitrateProber::SetEnabled(bool enable) {
if (enable) {
if (probing_state_ == ProbingState::kDisabled) {
probing_state_ = ProbingState::kInactive;
LOG_INFO("Bandwidth probing enabled, set to inactive");
}
} else {
probing_state_ = ProbingState::kDisabled;
LOG_INFO("Bandwidth probing disabled");
}
}
void BitrateProber::SetAllowProbeWithoutMediaPacket(bool allow) {
config_.allow_start_probing_immediately = allow;
MaybeSetActiveState(/*packet_size=*/DataSize::Zero());
}
void BitrateProber::MaybeSetActiveState(DataSize packet_size) {
if (ReadyToSetActiveState(packet_size)) {
next_probe_time_ = Timestamp::MinusInfinity();
probing_state_ = ProbingState::kActive;
}
}
bool BitrateProber::ReadyToSetActiveState(DataSize packet_size) const {
if (clusters_.empty()) {
return false;
}
switch (probing_state_) {
case ProbingState::kDisabled:
case ProbingState::kActive:
return false;
case ProbingState::kInactive:
if (config_.allow_start_probing_immediately) {
return true;
}
// If config_.min_packet_size > 0, a "large enough" packet must be
// sent first, before a probe can be generated and sent. Otherwise,
// send the probe asap.
return packet_size >=
std::min(RecommendedMinProbeSize(), config_.min_packet_size);
}
return false;
}
void BitrateProber::OnIncomingPacket(DataSize packet_size) {
MaybeSetActiveState(packet_size);
}
void BitrateProber::CreateProbeCluster(
const ProbeClusterConfig& cluster_config) {
while (!clusters_.empty() &&
(cluster_config.at_time - clusters_.front().requested_at >
kProbeClusterTimeout ||
clusters_.size() > kMaxPendingProbeClusters)) {
clusters_.pop();
}
ProbeCluster cluster;
cluster.requested_at = cluster_config.at_time;
cluster.pace_info.probe_cluster_min_probes =
cluster_config.target_probe_count;
cluster.pace_info.probe_cluster_min_bytes =
(cluster_config.target_data_rate * cluster_config.target_duration)
.bytes();
cluster.min_probe_delta = cluster_config.min_probe_delta;
cluster.pace_info.send_bitrate = cluster_config.target_data_rate;
cluster.pace_info.probe_cluster_id = cluster_config.id;
clusters_.push(cluster);
MaybeSetActiveState(/*packet_size=*/DataSize::Zero());
LOG_INFO("Probe cluster (bitrate_bps:min bytes:min packets): ({}:{}:{}, {})",
cluster.pace_info.send_bitrate.bps(),
cluster.pace_info.probe_cluster_min_bytes,
cluster.pace_info.probe_cluster_min_probes,
probing_state_ == ProbingState::kInactive ? "Inactive" : "Active");
}
Timestamp BitrateProber::NextProbeTime(Timestamp /* now */) const {
// Probing is not active or probing is already complete.
if (probing_state_ != ProbingState::kActive || clusters_.empty()) {
return Timestamp::PlusInfinity();
}
return next_probe_time_;
}
std::optional<PacedPacketInfo> BitrateProber::CurrentCluster(Timestamp now) {
if (clusters_.empty() || probing_state_ != ProbingState::kActive) {
return std::nullopt;
}
if (next_probe_time_.IsFinite() &&
now - next_probe_time_ > config_.max_probe_delay) {
LOG_WARN(
"Probe delay too high (next_ms:{}, now_ms: {}), discarding probe "
"cluster.",
next_probe_time_.ms(), now.ms());
clusters_.pop();
if (clusters_.empty()) {
probing_state_ = ProbingState::kInactive;
return std::nullopt;
}
}
PacedPacketInfo info = clusters_.front().pace_info;
info.probe_cluster_bytes_sent = clusters_.front().sent_bytes;
return info;
}
DataSize BitrateProber::RecommendedMinProbeSize() const {
if (clusters_.empty()) {
return DataSize::Zero();
}
DataRate send_rate = clusters_.front().pace_info.send_bitrate;
return send_rate * clusters_.front().min_probe_delta;
}
void BitrateProber::ProbeSent(Timestamp now, DataSize size) {
if (!clusters_.empty()) {
ProbeCluster* cluster = &clusters_.front();
if (cluster->sent_probes == 0) {
cluster->started_at = now;
}
cluster->sent_bytes += size.bytes<int>();
cluster->sent_probes += 1;
next_probe_time_ = CalculateNextProbeTime(*cluster);
if (cluster->sent_bytes >= cluster->pace_info.probe_cluster_min_bytes &&
cluster->sent_probes >= cluster->pace_info.probe_cluster_min_probes) {
clusters_.pop();
}
if (clusters_.empty()) {
probing_state_ = ProbingState::kInactive;
}
}
}
Timestamp BitrateProber::CalculateNextProbeTime(
const ProbeCluster& cluster) const {
// Compute the time delta from the cluster start to ensure probe bitrate stays
// close to the target bitrate. Result is in milliseconds.
DataSize sent_bytes = DataSize::Bytes(cluster.sent_bytes);
DataRate send_bitrate = cluster.pace_info.send_bitrate;
TimeDelta delta = sent_bytes / send_bitrate;
return cluster.started_at + delta;
}
} // namespace webrtc

129
src/qos/bitrate_prober.h Normal file
View File

@@ -0,0 +1,129 @@
/*
* Copyright (c) 2014 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_PACING_BITRATE_PROBER_H_
#define MODULES_PACING_BITRATE_PROBER_H_
#include <stddef.h>
#include <optional>
#include <queue>
#include "api/transport/network_types.h"
#include "api/units/data_size.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
namespace webrtc {
struct BitrateProberConfig {
explicit BitrateProberConfig();
BitrateProberConfig(const BitrateProberConfig&) = default;
BitrateProberConfig& operator=(const BitrateProberConfig&) = default;
~BitrateProberConfig() = default;
// Maximum amount of time each probe can be delayed.
TimeDelta max_probe_delay;
// This is used to start sending a probe after a large enough packet.
// The min packet size is scaled with the bitrate we're probing at.
// This defines the max min packet size, meaning that on high bitrates
// a packet of at least this size is needed to trigger sending a probe.
DataSize min_packet_size;
// If true, `min_packet_size` is ignored.
bool allow_start_probing_immediately = false;
};
// Note that this class isn't thread-safe by itself and therefore relies
// on being protected by the caller.
class BitrateProber {
public:
explicit BitrateProber();
~BitrateProber() = default;
void SetEnabled(bool enable);
void SetAllowProbeWithoutMediaPacket(bool allow);
// Returns true if the prober is in a probing session, i.e., it currently
// wants packets to be sent out according to the time returned by
// TimeUntilNextProbe().
bool is_probing() const { return probing_state_ == ProbingState::kActive; }
// Initializes a new probing session if the prober is allowed to probe. Does
// not initialize the prober unless the packet size is large enough to probe
// with.
void OnIncomingPacket(DataSize packet_size);
// Create a cluster used to probe.
void CreateProbeCluster(const ProbeClusterConfig& cluster_config);
// Returns the time at which the next probe should be sent to get accurate
// probing. If probing is not desired at this time, Timestamp::PlusInfinity()
// will be returned.
// TODO(bugs.webrtc.org/11780): Remove `now` argument when old mode is gone.
Timestamp NextProbeTime(Timestamp now) const;
// Information about the current probing cluster.
std::optional<PacedPacketInfo> CurrentCluster(Timestamp now);
// Returns the minimum number of bytes that the prober recommends for
// the next probe, or zero if not probing. A probe can consist of multiple
// packets that are sent back to back.
DataSize RecommendedMinProbeSize() const;
// Called to report to the prober that a probe has been sent. In case of
// multiple packets per probe, this call would be made at the end of sending
// the last packet in probe. `size` is the total size of all packets in probe.
void ProbeSent(Timestamp now, DataSize size);
private:
enum class ProbingState {
// Probing will not be triggered in this state at all times.
kDisabled,
// Probing is enabled and ready to trigger on the first packet arrival if
// there is a probe cluster.
kInactive,
// Probe cluster is filled with the set of data rates to be probed and
// probes are being sent.
kActive,
};
// A probe cluster consists of a set of probes. Each probe in turn can be
// divided into a number of packets to accommodate the MTU on the network.
struct ProbeCluster {
PacedPacketInfo pace_info;
int sent_probes = 0;
int sent_bytes = 0;
TimeDelta min_probe_delta = TimeDelta::Zero();
Timestamp requested_at = Timestamp::MinusInfinity();
Timestamp started_at = Timestamp::MinusInfinity();
};
Timestamp CalculateNextProbeTime(const ProbeCluster& cluster) const;
void MaybeSetActiveState(DataSize packet_size);
bool ReadyToSetActiveState(DataSize packet_size) const;
ProbingState probing_state_;
// Probe bitrate per packet. These are used to compute the delta relative to
// the previous probe packet based on the size and time when that packet was
// sent.
std::queue<ProbeCluster> clusters_;
// Time the next probe should be sent when in kActive state.
Timestamp next_probe_time_;
BitrateProberConfig config_;
};
} // namespace webrtc
#endif // MODULES_PACING_BITRATE_PROBER_H_

View File

@@ -0,0 +1,691 @@
/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "pacing_controller.h"
#include <algorithm>
#include <array>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <utility>
#include <vector>
#include "api/array_view.h"
#include "api/clock/clock.h"
#include "api/transport/network_types.h"
#include "api/units/data_rate.h"
#include "api/units/data_size.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "bitrate_prober.h"
#include "log.h"
#include "rtc_base/numerics/safe_conversions.h"
#include "rtp_rtcp_defines.h"
namespace webrtc {
namespace {
constexpr TimeDelta kCongestedPacketInterval = TimeDelta::Millis(500);
// TODO(sprang): Consider dropping this limit.
// The maximum debt level, in terms of time, capped when sending packets.
constexpr TimeDelta kMaxDebtInTime = TimeDelta::Millis(500);
constexpr TimeDelta kMaxElapsedTime = TimeDelta::Seconds(2);
} // namespace
const TimeDelta PacingController::kPausedProcessInterval =
kCongestedPacketInterval;
const TimeDelta PacingController::kMinSleepTime = TimeDelta::Millis(1);
const TimeDelta PacingController::kTargetPaddingDuration = TimeDelta::Millis(5);
const TimeDelta PacingController::kMaxPaddingReplayDuration =
TimeDelta::Millis(50);
const TimeDelta PacingController::kMaxEarlyProbeProcessing =
TimeDelta::Millis(1);
PacingController::PacingController(Clock* clock, PacketSender* packet_sender,
Configuration configuration)
: clock_(clock),
packet_sender_(packet_sender),
drain_large_queues_(configuration.drain_large_queues),
send_padding_if_silent_(true),
pace_audio_(false),
ignore_transport_overhead_(false),
fast_retransmissions_(true),
keyframe_flushing_(configuration.keyframe_flushing),
transport_overhead_per_packet_(DataSize::Zero()),
send_burst_interval_(configuration.send_burst_interval),
last_timestamp_(clock->CurrentTime()),
paused_(false),
media_debt_(DataSize::Zero()),
padding_debt_(DataSize::Zero()),
pacing_rate_(DataRate::Zero()),
adjusted_media_rate_(DataRate::Zero()),
padding_rate_(DataRate::Zero()),
probing_send_failure_(false),
last_process_time_(clock->CurrentTime()),
last_send_time_(last_process_time_),
seen_first_packet_(false),
packet_queue_(/*creation_time=*/last_process_time_,
configuration.prioritize_audio_retransmission,
configuration.packet_queue_ttl),
congested_(false),
queue_time_limit_(configuration.queue_time_limit),
account_for_audio_(false),
include_overhead_(false),
circuit_breaker_threshold_(1 << 16) {
if (!drain_large_queues_) {
LOG_WARN(
"Pacer queues will not be drained, pushback experiment must be "
"enabled.");
}
}
PacingController::~PacingController() = default;
void PacingController::CreateProbeClusters(
rtc::ArrayView<const ProbeClusterConfig> probe_cluster_configs) {
for (const ProbeClusterConfig probe_cluster_config : probe_cluster_configs) {
prober_.CreateProbeCluster(probe_cluster_config);
}
}
void PacingController::Pause() {
if (!paused_) {
LOG_INFO("PacedSender paused.");
}
paused_ = true;
packet_queue_.SetPauseState(true, CurrentTime());
}
void PacingController::Resume() {
if (paused_) {
LOG_INFO("PacedSender resumed.");
}
paused_ = false;
packet_queue_.SetPauseState(false, CurrentTime());
}
bool PacingController::IsPaused() const { return paused_; }
void PacingController::SetCongested(bool congested) {
if (congested_ && !congested) {
UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(CurrentTime()));
}
congested_ = congested;
}
void PacingController::SetCircuitBreakerThreshold(int num_iterations) {
circuit_breaker_threshold_ = num_iterations;
}
void PacingController::RemovePacketsForSsrc(uint32_t ssrc) {
packet_queue_.RemovePacketsForSsrc(ssrc);
}
bool PacingController::IsProbing() const { return prober_.is_probing(); }
Timestamp PacingController::CurrentTime() const {
Timestamp time = clock_->CurrentTime();
if (time < last_timestamp_) {
LOG_WARN(
"Non-monotonic clock behavior observed. Previous timestamp: {}, new "
"timestamp: {}",
last_timestamp_.ms(), time.ms());
time = last_timestamp_;
}
last_timestamp_ = time;
return time;
}
void PacingController::SetProbingEnabled(bool enabled) {
prober_.SetEnabled(enabled);
}
void PacingController::SetPacingRates(DataRate pacing_rate,
DataRate padding_rate) {
if (padding_rate > pacing_rate) {
LOG_WARN(
"Padding rate {}kbps is higher than the pacing rate {}kbps, capping.",
padding_rate.kbps(), pacing_rate.kbps());
padding_rate = pacing_rate;
}
if (pacing_rate > max_rate || padding_rate > max_rate) {
LOG_WARN(
"Very high pacing rates ( > {} kbps) configured: pacing = {} kbps, "
"padding = {}kbps.",
max_rate.kbps(), pacing_rate.kbps(), padding_rate.kbps());
max_rate = std::max(pacing_rate, padding_rate) * 1.1;
}
pacing_rate_ = pacing_rate;
padding_rate_ = padding_rate;
MaybeUpdateMediaRateDueToLongQueue(CurrentTime());
LOG_INFO("bwe:pacer_updated pacing_kbps={} padding_budget_kbps={}",
pacing_rate_.kbps(), padding_rate.kbps());
}
void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
if (keyframe_flushing_ &&
packet->packet_type() == RtpPacketMediaType::kVideo &&
packet->is_key_frame() && packet->is_first_packet_of_frame() &&
!packet_queue_.HasKeyframePackets(packet->Ssrc())) {
// First packet of a keyframe (and no keyframe packets currently in the
// queue). Flush any pending packets currently in the queue for that stream
// in order to get the new keyframe out as quickly as possible.
packet_queue_.RemovePacketsForSsrc(packet->Ssrc());
std::optional<uint32_t> rtx_ssrc =
packet_sender_->GetRtxSsrcForMedia(packet->Ssrc());
if (rtx_ssrc) {
packet_queue_.RemovePacketsForSsrc(*rtx_ssrc);
}
}
prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size()));
const Timestamp now = CurrentTime();
if (packet_queue_.Empty()) {
// If queue is empty, we need to "fast-forward" the last process time,
// so that we don't use passed time as budget for sending the first new
// packet.
Timestamp target_process_time = now;
Timestamp next_send_time = NextSendTime();
if (next_send_time.IsFinite()) {
// There was already a valid planned send time, such as a keep-alive.
// Use that as last process time only if it's prior to now.
target_process_time = std::min(now, next_send_time);
}
UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_process_time));
}
packet_queue_.Push(now, std::move(packet));
seen_first_packet_ = true;
// Queue length has increased, check if we need to change the pacing rate.
MaybeUpdateMediaRateDueToLongQueue(now);
}
void PacingController::SetAccountForAudioPackets(bool account_for_audio) {
account_for_audio_ = account_for_audio;
}
void PacingController::SetIncludeOverhead() { include_overhead_ = true; }
void PacingController::SetTransportOverhead(DataSize overhead_per_packet) {
if (ignore_transport_overhead_) return;
transport_overhead_per_packet_ = overhead_per_packet;
}
void PacingController::SetSendBurstInterval(TimeDelta burst_interval) {
send_burst_interval_ = burst_interval;
}
void PacingController::SetAllowProbeWithoutMediaPacket(bool allow) {
prober_.SetAllowProbeWithoutMediaPacket(allow);
}
TimeDelta PacingController::ExpectedQueueTime() const {
return QueueSizeData() / adjusted_media_rate_;
}
size_t PacingController::QueueSizePackets() const {
return rtc::checked_cast<size_t>(packet_queue_.SizeInPackets());
}
const std::array<int, kNumMediaTypes>&
PacingController::SizeInPacketsPerRtpPacketMediaType() const {
return packet_queue_.SizeInPacketsPerRtpPacketMediaType();
}
DataSize PacingController::QueueSizeData() const {
DataSize size = packet_queue_.SizeInPayloadBytes();
if (include_overhead_) {
size += static_cast<int64_t>(packet_queue_.SizeInPackets()) *
transport_overhead_per_packet_;
}
return size;
}
DataSize PacingController::CurrentBufferLevel() const {
return std::max(media_debt_, padding_debt_);
}
std::optional<Timestamp> PacingController::FirstSentPacketTime() const {
return first_sent_packet_time_;
}
Timestamp PacingController::OldestPacketEnqueueTime() const {
return packet_queue_.OldestEnqueueTime();
}
TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) {
// If no previous processing, or last process was "in the future" because of
// early probe processing, then there is no elapsed time to add budget for.
if (last_process_time_.IsMinusInfinity() || now < last_process_time_) {
return TimeDelta::Zero();
}
TimeDelta elapsed_time = now - last_process_time_;
last_process_time_ = now;
if (elapsed_time > kMaxElapsedTime) {
LOG_WARN("Elapsed time ({}) longer than expected, limiting to {}",
elapsed_time.seconds(), kMaxElapsedTime.seconds());
elapsed_time = kMaxElapsedTime;
}
return elapsed_time;
}
bool PacingController::ShouldSendKeepalive(Timestamp now) const {
if (send_padding_if_silent_ || paused_ || congested_ || !seen_first_packet_) {
// We send a padding packet every 500 ms to ensure we won't get stuck in
// congested state due to no feedback being received.
if (now - last_send_time_ >= kCongestedPacketInterval) {
return true;
}
}
return false;
}
Timestamp PacingController::NextSendTime() const {
const Timestamp now = CurrentTime();
Timestamp next_send_time = Timestamp::PlusInfinity();
if (paused_) {
return last_send_time_ + kPausedProcessInterval;
}
// If probing is active, that always takes priority.
if (prober_.is_probing() && !probing_send_failure_) {
Timestamp probe_time = prober_.NextProbeTime(now);
if (!probe_time.IsPlusInfinity()) {
return probe_time.IsMinusInfinity() ? now : probe_time;
}
}
// If queue contains a packet which should not be paced, its target send time
// is the time at which it was enqueued.
Timestamp unpaced_send_time = NextUnpacedSendTime();
if (unpaced_send_time.IsFinite()) {
return unpaced_send_time;
}
if (congested_ || !seen_first_packet_) {
// We need to at least send keep-alive packets with some interval.
return last_send_time_ + kCongestedPacketInterval;
}
if (adjusted_media_rate_ > DataRate::Zero() && !packet_queue_.Empty()) {
// If packets are allowed to be sent in a burst, the
// debt is allowed to grow up to one packet more than what can be sent
// during 'send_burst_period_'.
TimeDelta drain_time = media_debt_ / adjusted_media_rate_;
// Ensure that a burst of sent packet is not larger than kMaxBurstSize in
// order to not risk overfilling socket buffers at high bitrate.
TimeDelta send_burst_interval =
std::min(send_burst_interval_, kMaxBurstSize / adjusted_media_rate_);
next_send_time =
last_process_time_ +
((send_burst_interval > drain_time) ? TimeDelta::Zero() : drain_time);
} else if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) {
// If we _don't_ have pending packets, check how long until we have
// bandwidth for padding packets. Both media and padding debts must
// have been drained to do this.
TimeDelta drain_time = std::max(media_debt_ / adjusted_media_rate_,
padding_debt_ / padding_rate_);
if (drain_time.IsZero() &&
(!media_debt_.IsZero() || !padding_debt_.IsZero())) {
// We have a non-zero debt, but drain time is smaller than tick size of
// TimeDelta, round it up to the smallest possible non-zero delta.
drain_time = TimeDelta::Micros(1);
}
next_send_time = last_process_time_ + drain_time;
} else {
// Nothing to do.
next_send_time = last_process_time_ + kPausedProcessInterval;
}
if (send_padding_if_silent_) {
next_send_time =
std::min(next_send_time, last_send_time_ + kPausedProcessInterval);
}
return next_send_time;
}
void PacingController::ProcessPackets() {
auto cleanup = std::unique_ptr<void, std::function<void(void*)>>(
nullptr, [packet_sender = packet_sender_](void*) {
packet_sender->OnBatchComplete();
});
const Timestamp now = CurrentTime();
Timestamp target_send_time = now;
if (ShouldSendKeepalive(now)) {
DataSize keepalive_data_sent = DataSize::Zero();
// We can not send padding unless a normal packet has first been sent. If
// we do, timestamps get messed up.
if (seen_first_packet_) {
std::vector<std::unique_ptr<RtpPacketToSend>> keepalive_packets =
packet_sender_->GeneratePadding(DataSize::Bytes(1));
for (auto& packet : keepalive_packets) {
keepalive_data_sent +=
DataSize::Bytes(packet->payload_size() + packet->padding_size());
packet_sender_->SendPacket(std::move(packet), PacedPacketInfo());
for (auto& packet : packet_sender_->FetchFec()) {
EnqueuePacket(std::move(packet));
}
}
}
OnPacketSent(RtpPacketMediaType::kPadding, keepalive_data_sent, now);
}
if (paused_) {
return;
}
TimeDelta early_execute_margin =
prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero();
target_send_time = NextSendTime();
if (now + early_execute_margin < target_send_time) {
// We are too early, but if queue is empty still allow draining some debt.
// Probing is allowed to be sent up to kMinSleepTime early.
UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(now));
return;
}
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(target_send_time);
if (elapsed_time > TimeDelta::Zero()) {
UpdateBudgetWithElapsedTime(elapsed_time);
}
PacedPacketInfo pacing_info;
DataSize recommended_probe_size = DataSize::Zero();
bool is_probing = prober_.is_probing();
if (is_probing) {
// Probe timing is sensitive, and handled explicitly by BitrateProber, so
// use actual send time rather than target.
pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo());
if (pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe) {
recommended_probe_size = prober_.RecommendedMinProbeSize();
} else {
// No valid probe cluster returned, probe might have timed out.
is_probing = false;
}
}
DataSize data_sent = DataSize::Zero();
int iteration = 0;
int packets_sent = 0;
int padding_packets_generated = 0;
for (; iteration < circuit_breaker_threshold_; ++iteration) {
// Fetch packet, so long as queue is not empty or budget is not
// exhausted.
std::unique_ptr<RtpPacketToSend> rtp_packet =
GetPendingPacket(pacing_info, target_send_time, now);
if (rtp_packet == nullptr) {
// No packet available to send, check if we should send padding.
if (now - target_send_time > kMaxPaddingReplayDuration) {
// The target send time is more than `kMaxPaddingReplayDuration` behind
// the real-time clock. This can happen if the clock is adjusted forward
// without `ProcessPackets()` having been called at the expected times.
target_send_time = now - kMaxPaddingReplayDuration;
last_process_time_ = std::max(last_process_time_, target_send_time);
}
DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);
if (padding_to_add > DataSize::Zero()) {
std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =
packet_sender_->GeneratePadding(padding_to_add);
if (!padding_packets.empty()) {
padding_packets_generated += padding_packets.size();
for (auto& packet : padding_packets) {
EnqueuePacket(std::move(packet));
}
// Continue loop to send the padding that was just added.
continue;
} else {
// Can't generate padding, still update padding budget for next send
// time.
UpdatePaddingBudgetWithSentData(padding_to_add);
}
}
// Can't fetch new packet and no padding to send, exit send loop.
break;
} else {
const RtpPacketMediaType packet_type = *rtp_packet->packet_type();
DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() +
rtp_packet->padding_size());
if (include_overhead_) {
packet_size += DataSize::Bytes(rtp_packet->headers_size()) +
transport_overhead_per_packet_;
}
packet_sender_->SendPacket(std::move(rtp_packet), pacing_info);
for (auto& packet : packet_sender_->FetchFec()) {
EnqueuePacket(std::move(packet));
}
data_sent += packet_size;
++packets_sent;
// Send done, update send time.
OnPacketSent(packet_type, packet_size, now);
if (is_probing) {
pacing_info.probe_cluster_bytes_sent += packet_size.bytes();
// If we are currently probing, we need to stop the send loop when we
// have reached the send target.
if (data_sent >= recommended_probe_size) {
break;
}
}
// Update target send time in case that are more packets that we are late
// in processing.
target_send_time = NextSendTime();
if (target_send_time > now) {
// Exit loop if not probing.
if (!is_probing) {
break;
}
target_send_time = now;
}
UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_send_time));
}
}
if (iteration >= circuit_breaker_threshold_) {
// Circuit break activated. Log warning, adjust send time and return.
// TODO(sprang): Consider completely clearing state.
LOG_ERROR(
"PacingController exceeded max iterations in send-loop. Debug info: "
"packets sent = {}, padding packets generated = {}, bytes sent = {}, "
"probing = {}, recommended_probe_size = {}, now = {}, target_send_time "
"= {}, "
"last_process_time = {}, last_send_time = {}, paused = {}, media_debt "
"= {}, "
"padding_debt = {}, pacing_rate = {}, adjusted_media_rate = {}, "
"padding_rate = {}, "
"queue size (packets) = {}, queue size (payload bytes) = {}",
packets_sent, padding_packets_generated, data_sent.bytes(),
(is_probing ? "true" : "false"), recommended_probe_size.bytes(),
now.us(), target_send_time.us(), last_process_time_.us(),
last_send_time_.us(), (paused_ ? "true" : "false"), media_debt_.bytes(),
padding_debt_.bytes(), pacing_rate_.bps(), adjusted_media_rate_.bps(),
padding_rate_.bps(), packet_queue_.SizeInPackets(),
ToString(packet_queue_.SizeInPayloadBytes()));
last_send_time_ = now;
last_process_time_ = now;
return;
}
if (is_probing) {
probing_send_failure_ = data_sent == DataSize::Zero();
if (!probing_send_failure_) {
prober_.ProbeSent(CurrentTime(), data_sent);
}
}
// Queue length has probably decreased, check if pacing rate needs to updated.
// Poll the time again, since we might have enqueued new fec/padding packets
// with a later timestamp than `now`.
MaybeUpdateMediaRateDueToLongQueue(CurrentTime());
}
DataSize PacingController::PaddingToAdd(DataSize recommended_probe_size,
DataSize data_sent) const {
if (!packet_queue_.Empty()) {
// Actual payload available, no need to add padding.
return DataSize::Zero();
}
if (congested_) {
// Don't add padding if congested, even if requested for probing.
return DataSize::Zero();
}
if (!recommended_probe_size.IsZero()) {
if (recommended_probe_size > data_sent) {
return recommended_probe_size - data_sent;
}
return DataSize::Zero();
}
if (padding_rate_ > DataRate::Zero() && padding_debt_ == DataSize::Zero()) {
return kTargetPaddingDuration * padding_rate_;
}
return DataSize::Zero();
}
std::unique_ptr<RtpPacketToSend> PacingController::GetPendingPacket(
const PacedPacketInfo& pacing_info, Timestamp target_send_time,
Timestamp now) {
const bool is_probe =
pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe;
// If first packet in probe, insert a small padding packet so we have a
// more reliable start window for the rate estimation.
if (is_probe && pacing_info.probe_cluster_bytes_sent == 0) {
auto padding = packet_sender_->GeneratePadding(DataSize::Bytes(1));
// If no RTP modules sending media are registered, we may not get a
// padding packet back.
if (!padding.empty()) {
// We should never get more than one padding packets with a requested
// size of 1 byte.
return std::move(padding[0]);
}
}
if (packet_queue_.Empty()) {
return nullptr;
}
// First, check if there is any reason _not_ to send the next queued packet.
// Unpaced packets and probes are exempted from send checks.
if (NextUnpacedSendTime().IsInfinite() && !is_probe) {
if (congested_) {
// Don't send anything if congested.
return nullptr;
}
if (now <= target_send_time && send_burst_interval_.IsZero()) {
// We allow sending slightly early if we think that we would actually
// had been able to, had we been right on time - i.e. the current debt
// is not more than would be reduced to zero at the target sent time.
// If we allow packets to be sent in a burst, packet are allowed to be
// sent early.
TimeDelta flush_time = media_debt_ / adjusted_media_rate_;
if (now + flush_time > target_send_time) {
return nullptr;
}
}
}
return packet_queue_.Pop();
}
void PacingController::OnPacketSent(RtpPacketMediaType packet_type,
DataSize packet_size, Timestamp send_time) {
if (!first_sent_packet_time_ && packet_type != RtpPacketMediaType::kPadding) {
first_sent_packet_time_ = send_time;
}
bool audio_packet = packet_type == RtpPacketMediaType::kAudio;
if ((!audio_packet || account_for_audio_) && packet_size > DataSize::Zero()) {
UpdateBudgetWithSentData(packet_size);
}
last_send_time_ = send_time;
}
void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) {
media_debt_ -= std::min(media_debt_, adjusted_media_rate_ * delta);
padding_debt_ -= std::min(padding_debt_, padding_rate_ * delta);
}
void PacingController::UpdateBudgetWithSentData(DataSize size) {
media_debt_ += size;
media_debt_ = std::min(media_debt_, adjusted_media_rate_ * kMaxDebtInTime);
UpdatePaddingBudgetWithSentData(size);
}
void PacingController::UpdatePaddingBudgetWithSentData(DataSize size) {
padding_debt_ += size;
padding_debt_ = std::min(padding_debt_, padding_rate_ * kMaxDebtInTime);
}
void PacingController::SetQueueTimeLimit(TimeDelta limit) {
queue_time_limit_ = limit;
}
void PacingController::MaybeUpdateMediaRateDueToLongQueue(Timestamp now) {
adjusted_media_rate_ = pacing_rate_;
if (!drain_large_queues_) {
return;
}
DataSize queue_size_data = QueueSizeData();
if (queue_size_data > DataSize::Zero()) {
// Assuming equal size packets and input/output rate, the average packet
// has avg_time_left_ms left to get queue_size_bytes out of the queue, if
// time constraint shall be met. Determine bitrate needed for that.
packet_queue_.UpdateAverageQueueTime(now);
TimeDelta avg_time_left =
std::max(TimeDelta::Millis(1),
queue_time_limit_ - packet_queue_.AverageQueueTime());
DataRate min_rate_needed = queue_size_data / avg_time_left;
if (min_rate_needed > pacing_rate_) {
adjusted_media_rate_ = min_rate_needed;
LOG_INFO("bwe:large_pacing_queue pacing_rate_kbps={}",
pacing_rate_.kbps());
}
}
}
Timestamp PacingController::NextUnpacedSendTime() const {
if (!pace_audio_) {
Timestamp leading_audio_send_time =
packet_queue_.LeadingPacketEnqueueTime(RtpPacketMediaType::kAudio);
if (leading_audio_send_time.IsFinite()) {
return leading_audio_send_time;
}
}
if (fast_retransmissions_) {
Timestamp leading_retransmission_send_time =
packet_queue_.LeadingPacketEnqueueTimeForRetransmission();
if (leading_retransmission_send_time.IsFinite()) {
return leading_retransmission_send_time;
}
}
return Timestamp::MinusInfinity();
}
} // namespace webrtc

284
src/qos/pacing_controller.h Normal file
View File

@@ -0,0 +1,284 @@
/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_PACING_PACING_CONTROLLER_H_
#define MODULES_PACING_PACING_CONTROLLER_H_
#include <stddef.h>
#include <stdint.h>
#include <array>
#include <memory>
#include <optional>
#include <vector>
#include "api/array_view.h"
#include "api/clock/clock.h"
#include "api/transport/network_types.h"
#include "api/units/data_rate.h"
#include "api/units/data_size.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "bitrate_prober.h"
#include "prioritized_packet_queue.h"
#include "rtp_packet_to_send.h"
#include "rtp_rtcp_defines.h"
namespace webrtc {
// This class implements a leaky-bucket packet pacing algorithm. It handles the
// logic of determining which packets to send when, but the actual timing of
// the processing is done externally (e.g. RtpPacketPacer). Furthermore, the
// forwarding of packets when they are ready to be sent is also handled
// externally, via the PacingController::PacketSender interface.
class PacingController {
public:
class PacketSender {
public:
virtual ~PacketSender() = default;
virtual void SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) = 0;
// Should be called after each call to SendPacket().
virtual std::vector<std::unique_ptr<RtpPacketToSend>> FetchFec() = 0;
virtual std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
DataSize size) = 0;
// TODO(bugs.webrtc.org/1439830): Make pure virtual once subclasses adapt.
virtual void OnBatchComplete() {}
// TODO(bugs.webrtc.org/11340): Make pure virtual once downstream projects
// have been updated.
virtual void OnAbortedRetransmissions(
uint32_t /* ssrc */,
rtc::ArrayView<const uint16_t> /* sequence_numbers */) {}
virtual std::optional<uint32_t> GetRtxSsrcForMedia(
uint32_t /* ssrc */) const {
return std::nullopt;
}
};
// If no media or paused, wake up at least every `kPausedProcessIntervalMs` in
// order to send a keep-alive packet so we don't get stuck in a bad state due
// to lack of feedback.
static const TimeDelta kPausedProcessInterval;
// The default minimum time that should elapse calls to `ProcessPackets()`.
static const TimeDelta kMinSleepTime;
// When padding should be generated, add packets to the buffer with a size
// corresponding to this duration times the current padding rate.
static const TimeDelta kTargetPaddingDuration;
// The maximum time that the pacer can use when "replaying" passed time where
// padding should have been generated.
static const TimeDelta kMaxPaddingReplayDuration;
// Allow probes to be processed slightly ahead of inteded send time. Currently
// set to 1ms as this is intended to allow times be rounded down to the
// nearest millisecond.
static const TimeDelta kMaxEarlyProbeProcessing;
// Max total size of packets expected to be sent in a burst in order to not
// risk loosing packets due to too small send socket buffers. It upper limits
// the send burst interval.
// Ex: max send burst interval = 63Kb / 10Mbit/s = 50ms.
static constexpr DataSize kMaxBurstSize = DataSize::Bytes(63 * 1000);
// Configuration default values.
static constexpr TimeDelta kDefaultBurstInterval = TimeDelta::Millis(40);
static constexpr TimeDelta kMaxExpectedQueueLength = TimeDelta::Millis(2000);
struct Configuration {
// If the pacer queue grows longer than the configured max queue limit,
// pacer sends at the minimum rate needed to keep the max queue limit and
// ignore the current bandwidth estimate.
bool drain_large_queues = true;
// Expected max pacer delay. If ExpectedQueueTime() is higher than
// this value, the packet producers should wait (eg drop frames rather than
// encoding them). Bitrate sent may temporarily exceed target set by
// SetPacingRates() so that this limit will be upheld if
// `drain_large_queues` is set.
TimeDelta queue_time_limit = kMaxExpectedQueueLength;
// If the first packet of a keyframe is enqueued on a RTP stream, pacer
// skips forward to that packet and drops other enqueued packets on that
// stream, unless a keyframe is already being paced.
bool keyframe_flushing = false;
// Audio retransmission is prioritized before video retransmission packets.
bool prioritize_audio_retransmission = false;
// Configure separate timeouts per priority. After a timeout, a packet of
// that sort will not be paced and instead dropped.
// Note: to set TTL on audio retransmission,
// `prioritize_audio_retransmission` must be true.
PacketQueueTTL packet_queue_ttl;
// The pacer is allowed to send enqueued packets in bursts and can build up
// a packet "debt" that correspond to approximately the send rate during the
// burst interval.
TimeDelta send_burst_interval = kDefaultBurstInterval;
};
static Configuration DefaultConfiguration() { return Configuration{}; }
PacingController(Clock* clock, PacketSender* packet_sender,
Configuration configuration = DefaultConfiguration());
~PacingController();
// Adds the packet to the queue and calls PacketRouter::SendPacket() when
// it's time to send.
void EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet);
void CreateProbeClusters(
rtc::ArrayView<const ProbeClusterConfig> probe_cluster_configs);
void Pause(); // Temporarily pause all sending.
void Resume(); // Resume sending packets.
bool IsPaused() const;
void SetCongested(bool congested);
// Sets the pacing rates. Must be called once before packets can be sent.
void SetPacingRates(DataRate pacing_rate, DataRate padding_rate);
DataRate pacing_rate() const { return adjusted_media_rate_; }
// Currently audio traffic is not accounted by pacer and passed through.
// With the introduction of audio BWE audio traffic will be accounted for
// the pacer budget calculation. The audio traffic still will be injected
// at high priority.
void SetAccountForAudioPackets(bool account_for_audio);
void SetIncludeOverhead();
void SetTransportOverhead(DataSize overhead_per_packet);
// The pacer is allowed to send enqued packets in bursts and can build up a
// packet "debt" that correspond to approximately the send rate during
// 'burst_interval'.
void SetSendBurstInterval(TimeDelta burst_interval);
// A probe may be sent without first waing for a media packet.
void SetAllowProbeWithoutMediaPacket(bool allow);
// Returns the time when the oldest packet was queued.
Timestamp OldestPacketEnqueueTime() const;
// Number of packets in the pacer queue.
size_t QueueSizePackets() const;
// Number of packets in the pacer queue per media type (RtpPacketMediaType
// values are used as lookup index).
const std::array<int, kNumMediaTypes>& SizeInPacketsPerRtpPacketMediaType()
const;
// Totals size of packets in the pacer queue.
DataSize QueueSizeData() const;
// Current buffer level, i.e. max of media and padding debt.
DataSize CurrentBufferLevel() const;
// Returns the time when the first packet was sent.
std::optional<Timestamp> FirstSentPacketTime() const;
// Returns the number of milliseconds it will take to send the current
// packets in the queue, given the current size and bitrate, ignoring prio.
TimeDelta ExpectedQueueTime() const;
void SetQueueTimeLimit(TimeDelta limit);
// Enable bitrate probing. Enabled by default, mostly here to simplify
// testing. Must be called before any packets are being sent to have an
// effect.
void SetProbingEnabled(bool enabled);
// Returns the next time we expect ProcessPackets() to be called.
Timestamp NextSendTime() const;
// Check queue of pending packets and send them or padding packets, if budget
// is available.
void ProcessPackets();
bool IsProbing() const;
// Note: Intended for debugging purposes only, will be removed.
// Sets the number of iterations of the main loop in `ProcessPackets()` that
// is considered erroneous to exceed.
void SetCircuitBreakerThreshold(int num_iterations);
// Remove any pending packets matching this SSRC from the packet queue.
void RemovePacketsForSsrc(uint32_t ssrc);
private:
TimeDelta UpdateTimeAndGetElapsed(Timestamp now);
bool ShouldSendKeepalive(Timestamp now) const;
// Updates the number of bytes that can be sent for the next time interval.
void UpdateBudgetWithElapsedTime(TimeDelta delta);
void UpdateBudgetWithSentData(DataSize size);
void UpdatePaddingBudgetWithSentData(DataSize size);
DataSize PaddingToAdd(DataSize recommended_probe_size,
DataSize data_sent) const;
std::unique_ptr<RtpPacketToSend> GetPendingPacket(
const PacedPacketInfo& pacing_info, Timestamp target_send_time,
Timestamp now);
void OnPacketSent(RtpPacketMediaType packet_type, DataSize packet_size,
Timestamp send_time);
void MaybeUpdateMediaRateDueToLongQueue(Timestamp now);
Timestamp CurrentTime() const;
// Helper methods for packet that may not be paced. Returns a finite Timestamp
// if a packet type is configured to not be paced and the packet queue has at
// least one packet of that type. Otherwise returns
// Timestamp::MinusInfinity().
Timestamp NextUnpacedSendTime() const;
Clock* const clock_;
PacketSender* const packet_sender_;
const bool drain_large_queues_;
const bool send_padding_if_silent_;
const bool pace_audio_;
const bool ignore_transport_overhead_;
const bool fast_retransmissions_;
const bool keyframe_flushing_;
DataRate max_rate = DataRate::BitsPerSec(100'000'000);
DataSize transport_overhead_per_packet_;
TimeDelta send_burst_interval_;
// TODO(webrtc:9716): Remove this when we are certain clocks are monotonic.
// The last millisecond timestamp returned by `clock_`.
mutable Timestamp last_timestamp_;
bool paused_;
// Amount of outstanding data for media and padding.
DataSize media_debt_;
DataSize padding_debt_;
// The target pacing rate, signaled via SetPacingRates().
DataRate pacing_rate_;
// The media send rate, which might adjusted from pacing_rate_, e.g. if the
// pacing queue is growing too long.
DataRate adjusted_media_rate_;
// The padding target rate. We aim to fill up to this rate with padding what
// is not already used by media.
DataRate padding_rate_;
BitrateProber prober_;
bool probing_send_failure_;
Timestamp last_process_time_;
Timestamp last_send_time_;
std::optional<Timestamp> first_sent_packet_time_;
bool seen_first_packet_;
PrioritizedPacketQueue packet_queue_;
bool congested_;
TimeDelta queue_time_limit_;
bool account_for_audio_;
bool include_overhead_;
int circuit_breaker_threshold_;
};
} // namespace webrtc
#endif // MODULES_PACING_PACING_CONTROLLER_H_

View File

@@ -0,0 +1,443 @@
/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "prioritized_packet_queue.h"
#include <algorithm>
#include <array>
#include <cstddef>
#include <cstdint>
#include <deque>
#include <memory>
#include <optional>
#include <utility>
#include "api/units/data_size.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "inlined_vector.h"
#include "log.h"
#include "rtp_packet_to_send.h"
#include "rtp_rtcp_defines.h"
namespace webrtc {
namespace {
constexpr int kAudioPrioLevel = 0;
int GetPriorityForType(
RtpPacketMediaType type,
std::optional<RtpPacketToSend::OriginalType> original_type) {
// Lower number takes priority over higher.
switch (type) {
case RtpPacketMediaType::kAudio:
// Audio is always prioritized over other packet types.
return kAudioPrioLevel;
case RtpPacketMediaType::kRetransmission:
// Send retransmissions before new media. If original_type is set, audio
// retransmission is prioritized more than video retransmission.
if (original_type == RtpPacketToSend::OriginalType::kVideo) {
return kAudioPrioLevel + 2;
}
return kAudioPrioLevel + 1;
case RtpPacketMediaType::kVideo:
case RtpPacketMediaType::kForwardErrorCorrection:
// Video has "normal" priority, in the old speak.
// Send redundancy concurrently to video. If it is delayed it might have a
// lower chance of being useful.
return kAudioPrioLevel + 3;
case RtpPacketMediaType::kPadding:
// Packets that are in themselves likely useless, only sent to keep the
// BWE high.
return kAudioPrioLevel + 4;
}
return -1;
}
} // namespace
InlinedVector<TimeDelta, PrioritizedPacketQueue::kNumPriorityLevels>
PrioritizedPacketQueue::ToTtlPerPrio(PacketQueueTTL packet_queue_ttl) {
InlinedVector<TimeDelta, PrioritizedPacketQueue::kNumPriorityLevels>
ttl_per_prio;
ttl_per_prio.assign(kNumPriorityLevels, TimeDelta::PlusInfinity());
ttl_per_prio[GetPriorityForType(RtpPacketMediaType::kRetransmission,
RtpPacketToSend::OriginalType::kAudio)] =
packet_queue_ttl.audio_retransmission;
ttl_per_prio[GetPriorityForType(RtpPacketMediaType::kRetransmission,
RtpPacketToSend::OriginalType::kVideo)] =
packet_queue_ttl.video_retransmission;
ttl_per_prio[GetPriorityForType(RtpPacketMediaType::kVideo, std::nullopt)] =
packet_queue_ttl.video;
return ttl_per_prio;
}
DataSize PrioritizedPacketQueue::QueuedPacket::PacketSize() const {
return DataSize::Bytes(packet->payload_size() + packet->padding_size());
}
PrioritizedPacketQueue::StreamQueue::StreamQueue(Timestamp creation_time)
: last_enqueue_time_(creation_time), num_keyframe_packets_(0) {}
bool PrioritizedPacketQueue::StreamQueue::EnqueuePacket(QueuedPacket packet,
int priority_level) {
if (packet.packet->is_key_frame()) {
++num_keyframe_packets_;
}
bool first_packet_at_level = packets_[priority_level].empty();
packets_[priority_level].push_back(std::move(packet));
return first_packet_at_level;
}
PrioritizedPacketQueue::QueuedPacket
PrioritizedPacketQueue::StreamQueue::DequeuePacket(int priority_level) {
QueuedPacket packet = std::move(packets_[priority_level].front());
packets_[priority_level].pop_front();
if (packet.packet->is_key_frame()) {
--num_keyframe_packets_;
}
return packet;
}
bool PrioritizedPacketQueue::StreamQueue::HasPacketsAtPrio(
int priority_level) const {
return !packets_[priority_level].empty();
}
bool PrioritizedPacketQueue::StreamQueue::IsEmpty() const {
for (const std::deque<QueuedPacket>& queue : packets_) {
if (!queue.empty()) {
return false;
}
}
return true;
}
Timestamp PrioritizedPacketQueue::StreamQueue::LeadingPacketEnqueueTime(
int priority_level) const {
return packets_[priority_level].begin()->enqueue_time;
}
Timestamp PrioritizedPacketQueue::StreamQueue::LastEnqueueTime() const {
return last_enqueue_time_;
}
std::array<std::deque<PrioritizedPacketQueue::QueuedPacket>,
PrioritizedPacketQueue::kNumPriorityLevels>
PrioritizedPacketQueue::StreamQueue::DequeueAll() {
std::array<std::deque<QueuedPacket>, kNumPriorityLevels> packets_by_prio;
for (int i = 0; i < kNumPriorityLevels; ++i) {
packets_by_prio[i].swap(packets_[i]);
}
num_keyframe_packets_ = 0;
return packets_by_prio;
}
PrioritizedPacketQueue::PrioritizedPacketQueue(
Timestamp creation_time, bool prioritize_audio_retransmission,
PacketQueueTTL packet_queue_ttl)
: prioritize_audio_retransmission_(prioritize_audio_retransmission),
time_to_live_per_prio_(ToTtlPerPrio(packet_queue_ttl)),
queue_time_sum_(TimeDelta::Zero()),
pause_time_sum_(TimeDelta::Zero()),
size_packets_(0),
size_packets_per_media_type_({}),
size_payload_(DataSize::Zero()),
last_update_time_(creation_time),
paused_(false),
last_culling_time_(creation_time),
top_active_prio_level_(-1) {}
void PrioritizedPacketQueue::Push(Timestamp enqueue_time,
std::unique_ptr<RtpPacketToSend> packet) {
StreamQueue* stream_queue;
auto [it, inserted] = streams_.emplace(packet->Ssrc(), nullptr);
if (inserted) {
it->second = std::make_unique<StreamQueue>(enqueue_time);
}
stream_queue = it->second.get();
auto enqueue_time_iterator =
enqueue_times_.insert(enqueue_times_.end(), enqueue_time);
RtpPacketMediaType packet_type = packet->packet_type().value();
int prio_level =
GetPriorityForType(packet_type, prioritize_audio_retransmission_
? packet->original_packet_type()
: std::nullopt);
PurgeOldPacketsAtPriorityLevel(prio_level, enqueue_time);
QueuedPacket queued_packed = {std::move(packet), enqueue_time,
enqueue_time_iterator};
// In order to figure out how much time a packet has spent in the queue
// while not in a paused state, we subtract the total amount of time the
// queue has been paused so far, and when the packet is popped we subtract
// the total amount of time the queue has been paused at that moment. This
// way we subtract the total amount of time the packet has spent in the
// queue while in a paused state.
UpdateAverageQueueTime(enqueue_time);
queued_packed.enqueue_time -= pause_time_sum_;
++size_packets_;
++size_packets_per_media_type_[static_cast<size_t>(packet_type)];
size_payload_ += queued_packed.PacketSize();
if (stream_queue->EnqueuePacket(std::move(queued_packed), prio_level)) {
// Number packets at `prio_level` for this steam is now non-zero.
streams_by_prio_[prio_level].push_back(stream_queue);
}
if (top_active_prio_level_ < 0 || prio_level < top_active_prio_level_) {
top_active_prio_level_ = prio_level;
}
static constexpr TimeDelta kTimeout = TimeDelta::Millis(500);
if (enqueue_time - last_culling_time_ > kTimeout) {
for (auto it = streams_.begin(); it != streams_.end();) {
if (it->second->IsEmpty() &&
it->second->LastEnqueueTime() + kTimeout < enqueue_time) {
streams_.erase(it++);
} else {
++it;
}
}
last_culling_time_ = enqueue_time;
}
}
std::unique_ptr<RtpPacketToSend> PrioritizedPacketQueue::Pop() {
if (size_packets_ == 0) {
return nullptr;
}
StreamQueue& stream_queue = *streams_by_prio_[top_active_prio_level_].front();
QueuedPacket packet = stream_queue.DequeuePacket(top_active_prio_level_);
DequeuePacketInternal(packet);
// Remove StreamQueue from head of fifo-queue for this prio level, and
// and add it to the end if it still has packets.
streams_by_prio_[top_active_prio_level_].pop_front();
if (stream_queue.HasPacketsAtPrio(top_active_prio_level_)) {
streams_by_prio_[top_active_prio_level_].push_back(&stream_queue);
} else {
MaybeUpdateTopPrioLevel();
}
return std::move(packet.packet);
}
int PrioritizedPacketQueue::SizeInPackets() const { return size_packets_; }
DataSize PrioritizedPacketQueue::SizeInPayloadBytes() const {
return size_payload_;
}
bool PrioritizedPacketQueue::Empty() const { return size_packets_ == 0; }
const std::array<int, kNumMediaTypes>&
PrioritizedPacketQueue::SizeInPacketsPerRtpPacketMediaType() const {
return size_packets_per_media_type_;
}
Timestamp PrioritizedPacketQueue::LeadingPacketEnqueueTime(
RtpPacketMediaType type) const {
const int priority_level = GetPriorityForType(type, std::nullopt);
if (streams_by_prio_[priority_level].empty()) {
return Timestamp::MinusInfinity();
}
return streams_by_prio_[priority_level].front()->LeadingPacketEnqueueTime(
priority_level);
}
Timestamp PrioritizedPacketQueue::LeadingPacketEnqueueTimeForRetransmission()
const {
if (!prioritize_audio_retransmission_) {
const int priority_level =
GetPriorityForType(RtpPacketMediaType::kRetransmission, std::nullopt);
if (streams_by_prio_[priority_level].empty()) {
return Timestamp::PlusInfinity();
}
return streams_by_prio_[priority_level].front()->LeadingPacketEnqueueTime(
priority_level);
}
const int audio_priority_level =
GetPriorityForType(RtpPacketMediaType::kRetransmission,
RtpPacketToSend::OriginalType::kAudio);
const int video_priority_level =
GetPriorityForType(RtpPacketMediaType::kRetransmission,
RtpPacketToSend::OriginalType::kVideo);
Timestamp next_audio =
streams_by_prio_[audio_priority_level].empty()
? Timestamp::PlusInfinity()
: streams_by_prio_[audio_priority_level]
.front()
->LeadingPacketEnqueueTime(audio_priority_level);
Timestamp next_video =
streams_by_prio_[video_priority_level].empty()
? Timestamp::PlusInfinity()
: streams_by_prio_[video_priority_level]
.front()
->LeadingPacketEnqueueTime(video_priority_level);
return std::min(next_audio, next_video);
}
Timestamp PrioritizedPacketQueue::OldestEnqueueTime() const {
return enqueue_times_.empty() ? Timestamp::MinusInfinity()
: enqueue_times_.front();
}
TimeDelta PrioritizedPacketQueue::AverageQueueTime() const {
if (size_packets_ == 0) {
return TimeDelta::Zero();
}
return queue_time_sum_ / size_packets_;
}
void PrioritizedPacketQueue::UpdateAverageQueueTime(Timestamp now) {
if (now == last_update_time_) {
return;
}
TimeDelta delta = now - last_update_time_;
if (paused_) {
pause_time_sum_ += delta;
} else {
queue_time_sum_ += delta * size_packets_;
}
last_update_time_ = now;
}
void PrioritizedPacketQueue::SetPauseState(bool paused, Timestamp now) {
UpdateAverageQueueTime(now);
paused_ = paused;
}
void PrioritizedPacketQueue::RemovePacketsForSsrc(uint32_t ssrc) {
auto kv = streams_.find(ssrc);
if (kv != streams_.end()) {
// Dequeue all packets from the queue for this SSRC.
StreamQueue& queue = *kv->second;
std::array<std::deque<QueuedPacket>, kNumPriorityLevels> packets_by_prio =
queue.DequeueAll();
for (int i = 0; i < kNumPriorityLevels; ++i) {
std::deque<QueuedPacket>& packet_queue = packets_by_prio[i];
if (packet_queue.empty()) {
continue;
}
// First erase all packets at this prio level.
while (!packet_queue.empty()) {
QueuedPacket packet = std::move(packet_queue.front());
packet_queue.pop_front();
DequeuePacketInternal(packet);
}
// Next, deregister this `StreamQueue` from the round-robin tables.
if (streams_by_prio_[i].size() == 1) {
// This is the last and only queue that had packets for this prio level.
// Update the global top prio level if neccessary.
streams_by_prio_[i].pop_front();
} else {
// More than stream had packets at this prio level, filter this one out.
std::deque<StreamQueue*> filtered_queue;
for (StreamQueue* queue_ptr : streams_by_prio_[i]) {
if (queue_ptr != &queue) {
filtered_queue.push_back(queue_ptr);
}
}
streams_by_prio_[i].swap(filtered_queue);
}
}
}
MaybeUpdateTopPrioLevel();
}
bool PrioritizedPacketQueue::HasKeyframePackets(uint32_t ssrc) const {
auto it = streams_.find(ssrc);
if (it != streams_.end()) {
return it->second->has_keyframe_packets();
}
return false;
}
void PrioritizedPacketQueue::DequeuePacketInternal(QueuedPacket& packet) {
--size_packets_;
RtpPacketMediaType packet_type = packet.packet->packet_type().value();
--size_packets_per_media_type_[static_cast<size_t>(packet_type)];
size_payload_ -= packet.PacketSize();
// Calculate the total amount of time spent by this packet in the queue
// while in a non-paused state. Note that the `pause_time_sum_ms_` was
// subtracted from `packet.enqueue_time_ms` when the packet was pushed, and
// by subtracting it now we effectively remove the time spent in in the
// queue while in a paused state.
TimeDelta time_in_non_paused_state =
last_update_time_ - packet.enqueue_time - pause_time_sum_;
queue_time_sum_ -= time_in_non_paused_state;
// Set the time spent in the send queue, which is the per-packet equivalent of
// totalPacketSendDelay. The notion of being paused is an implementation
// detail that we do not want to expose, so it makes sense to report the
// metric excluding the pause time. This also avoids spikes in the metric.
// https://w3c.github.io/webrtc-stats/#dom-rtcoutboundrtpstreamstats-totalpacketsenddelay
packet.packet->set_time_in_send_queue(time_in_non_paused_state);
enqueue_times_.erase(packet.enqueue_time_iterator);
}
void PrioritizedPacketQueue::MaybeUpdateTopPrioLevel() {
if (top_active_prio_level_ != -1 &&
!streams_by_prio_[top_active_prio_level_].empty()) {
return;
}
// No stream queues have packets at top_active_prio_level_, find top priority
// that is not empty.
for (int i = 0; i < kNumPriorityLevels; ++i) {
PurgeOldPacketsAtPriorityLevel(i, last_update_time_);
if (!streams_by_prio_[i].empty()) {
top_active_prio_level_ = i;
break;
}
}
if (size_packets_ == 0) {
// There are no packets left to send. Last packet may have been purged. Prio
// will change when a new packet is pushed.
top_active_prio_level_ = -1;
}
}
void PrioritizedPacketQueue::PurgeOldPacketsAtPriorityLevel(int prio_level,
Timestamp now) {
TimeDelta time_to_live = time_to_live_per_prio_[prio_level];
if (time_to_live.IsInfinite()) {
return;
}
std::deque<StreamQueue*>& queues = streams_by_prio_[prio_level];
auto iter = queues.begin();
while (iter != queues.end()) {
StreamQueue* queue_ptr = *iter;
while (queue_ptr->HasPacketsAtPrio(prio_level) &&
(now - queue_ptr->LeadingPacketEnqueueTime(prio_level)) >
time_to_live) {
QueuedPacket packet = queue_ptr->DequeuePacket(prio_level);
LOG_INFO("Dropping old packet on SSRC: %u seq: %u time in queue: %lld ms",
packet.packet->Ssrc(), packet.packet->SequenceNumber(),
(now - packet.enqueue_time).ms());
DequeuePacketInternal(packet);
}
if (!queue_ptr->HasPacketsAtPrio(prio_level)) {
iter = queues.erase(iter);
} else {
++iter;
}
}
}
} // namespace webrtc

View File

@@ -0,0 +1,197 @@
/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_PACING_PRIORITIZED_PACKET_QUEUE_H_
#define MODULES_PACING_PRIORITIZED_PACKET_QUEUE_H_
#include <stddef.h>
#include <array>
#include <cstdint>
#include <deque>
#include <list>
#include <memory>
#include <unordered_map>
#include "api/units/data_size.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "inlined_vector.h"
#include "rtp_packet_to_send.h"
#include "rtp_rtcp_defines.h"
namespace webrtc {
// Describes how long time a packet may stay in the queue before being dropped.
struct PacketQueueTTL {
TimeDelta audio_retransmission = TimeDelta::PlusInfinity();
TimeDelta video_retransmission = TimeDelta::PlusInfinity();
TimeDelta video = TimeDelta::PlusInfinity();
};
class PrioritizedPacketQueue {
public:
explicit PrioritizedPacketQueue(
Timestamp creation_time, bool prioritize_audio_retransmission = false,
PacketQueueTTL packet_queue_ttl = PacketQueueTTL());
PrioritizedPacketQueue(const PrioritizedPacketQueue&) = delete;
PrioritizedPacketQueue& operator=(const PrioritizedPacketQueue&) = delete;
// Add a packet to the queue. The enqueue time is used for queue time stats
// and to report the leading packet enqueue time per packet type.
void Push(Timestamp enqueue_time, std::unique_ptr<RtpPacketToSend> packet);
// Remove the next packet from the queue. Packets a prioritized first
// according to packet type, in the following order:
// - audio, retransmissions, video / fec, padding
// For each packet type, we use one FIFO-queue per SSRC and emit from
// those queues in a round-robin fashion.
std::unique_ptr<RtpPacketToSend> Pop();
// Number of packets in the queue.
int SizeInPackets() const;
// Sum of all payload bytes in the queue, where the payload is calculated
// as `packet->payload_size() + packet->padding_size()`.
DataSize SizeInPayloadBytes() const;
// Convenience method for `SizeInPackets() == 0`.
bool Empty() const;
// Total packets in the queue per media type (RtpPacketMediaType values are
// used as lookup index).
const std::array<int, kNumMediaTypes>& SizeInPacketsPerRtpPacketMediaType()
const;
// The enqueue time of the next packet this queue will return via the Pop()
// method, for the given packet type. If queue has no packets, of that type,
// returns Timestamp::MinusInfinity().
Timestamp LeadingPacketEnqueueTime(RtpPacketMediaType type) const;
Timestamp LeadingPacketEnqueueTimeForRetransmission() const;
// Enqueue time of the oldest packet in the queue,
// Timestamp::MinusInfinity() if queue is empty.
Timestamp OldestEnqueueTime() const;
// Average queue time for the packets currently in the queue.
// The queuing time is calculated from Push() to the last UpdateQueueTime()
// call - with any time spent in a paused state subtracted.
// Returns TimeDelta::Zero() for an empty queue.
TimeDelta AverageQueueTime() const;
// Called during packet processing or when pause stats changes. Since the
// AverageQueueTime() method does not look at the wall time, this method
// needs to be called before querying queue time.
void UpdateAverageQueueTime(Timestamp now);
// Set the pause state, while `paused` is true queuing time is not counted.
void SetPauseState(bool paused, Timestamp now);
// Remove any packets matching the given SSRC.
void RemovePacketsForSsrc(uint32_t ssrc);
// Checks if the queue for the given SSRC has original (retransmissions not
// counted) video packets containing keyframe data.
bool HasKeyframePackets(uint32_t ssrc) const;
private:
static constexpr int kNumPriorityLevels = 5;
class QueuedPacket {
public:
DataSize PacketSize() const;
std::unique_ptr<RtpPacketToSend> packet;
Timestamp enqueue_time;
std::list<Timestamp>::iterator enqueue_time_iterator;
};
// Class containing packets for an RTP stream.
// For each priority level, packets are simply stored in a fifo queue.
class StreamQueue {
public:
explicit StreamQueue(Timestamp creation_time);
StreamQueue(StreamQueue&&) = default;
StreamQueue& operator=(StreamQueue&&) = default;
StreamQueue(const StreamQueue&) = delete;
StreamQueue& operator=(const StreamQueue&) = delete;
// Enqueue packet at the given priority level. Returns true if the packet
// count for that priority level went from zero to non-zero.
bool EnqueuePacket(QueuedPacket packet, int priority_level);
QueuedPacket DequeuePacket(int priority_level);
bool HasPacketsAtPrio(int priority_level) const;
bool IsEmpty() const;
Timestamp LeadingPacketEnqueueTime(int priority_level) const;
Timestamp LastEnqueueTime() const;
bool has_keyframe_packets() const { return num_keyframe_packets_ > 0; }
std::array<std::deque<QueuedPacket>, kNumPriorityLevels> DequeueAll();
private:
std::deque<QueuedPacket> packets_[kNumPriorityLevels];
Timestamp last_enqueue_time_;
int num_keyframe_packets_;
};
// Remove the packet from the internal state, e.g. queue time / size etc.
void DequeuePacketInternal(QueuedPacket& packet);
// Check if the queue pointed to by `top_active_prio_level_` is empty and
// if so move it to the lowest non-empty index.
void MaybeUpdateTopPrioLevel();
void PurgeOldPacketsAtPriorityLevel(int prio_level, Timestamp now);
static InlinedVector<TimeDelta, kNumPriorityLevels> ToTtlPerPrio(
PacketQueueTTL);
const bool prioritize_audio_retransmission_;
const InlinedVector<TimeDelta, kNumPriorityLevels> time_to_live_per_prio_;
// Cumulative sum, over all packets, of time spent in the queue.
TimeDelta queue_time_sum_;
// Cumulative sum of time the queue has spent in a paused state.
TimeDelta pause_time_sum_;
// Total number of packets stored in this queue.
int size_packets_;
// Total number of packets stored in this queue per RtpPacketMediaType.
std::array<int, kNumMediaTypes> size_packets_per_media_type_;
// Sum of payload sizes for all packts stored in this queue.
DataSize size_payload_;
// The last time queue/pause time sums were updated.
Timestamp last_update_time_;
bool paused_;
// Last time `streams_` was culled for inactive streams.
Timestamp last_culling_time_;
// Map from SSRC to packet queues for the associated RTP stream.
std::unordered_map<uint32_t, std::unique_ptr<StreamQueue>> streams_;
// For each priority level, a queue of StreamQueues which have at least one
// packet pending for that prio level.
std::deque<StreamQueue*> streams_by_prio_[kNumPriorityLevels];
// The first index into `stream_by_prio_` that is non-empty.
int top_active_prio_level_;
// Ordered list of enqueue times. Additions are always increasing and added to
// the end. QueuedPacket instances have a iterators into this list for fast
// removal.
std::list<Timestamp> enqueue_times_;
};
} // namespace webrtc
#endif // MODULES_PACING_PRIORITIZED_PACKET_QUEUE_H_

View File

@@ -0,0 +1,72 @@
/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_PACING_RTP_PACKET_PACER_H_
#define MODULES_PACING_RTP_PACKET_PACER_H_
#include <optional>
#include <vector>
#include "api/transport/network_types.h"
#include "api/units/data_rate.h"
#include "api/units/data_size.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
namespace webrtc {
class RtpPacketPacer {
public:
virtual ~RtpPacketPacer() = default;
virtual void CreateProbeClusters(
std::vector<ProbeClusterConfig> probe_cluster_configs) = 0;
// Temporarily pause all sending.
virtual void Pause() = 0;
// Resume sending packets.
virtual void Resume() = 0;
virtual void SetCongested(bool congested) = 0;
// Sets the pacing rates. Must be called once before packets can be sent.
virtual void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) = 0;
// Time since the oldest packet currently in the queue was added.
virtual TimeDelta OldestPacketWaitTime() const = 0;
// Sum of payload + padding bytes of all packets currently in the pacer queue.
virtual DataSize QueueSizeData() const = 0;
// Returns the time when the first packet was sent.
virtual std::optional<Timestamp> FirstSentPacketTime() const = 0;
// Returns the expected number of milliseconds it will take to send the
// current packets in the queue, given the current size and bitrate, ignoring
// priority.
virtual TimeDelta ExpectedQueueTime() const = 0;
// Set the average upper bound on pacer queuing delay. The pacer may send at
// a higher rate than what was configured via SetPacingRates() in order to
// keep ExpectedQueueTimeMs() below `limit_ms` on average.
virtual void SetQueueTimeLimit(TimeDelta limit) = 0;
// Currently audio traffic is not accounted by pacer and passed through.
// With the introduction of audio BWE audio traffic will be accounted for
// the pacer budget calculation. The audio traffic still will be injected
// at high priority.
virtual void SetAccountForAudioPackets(bool account_for_audio) = 0;
virtual void SetIncludeOverhead() = 0;
virtual void SetTransportOverhead(DataSize overhead_per_packet) = 0;
};
} // namespace webrtc
#endif // MODULES_PACING_RTP_PACKET_PACER_H_

View File

@@ -52,6 +52,7 @@ void IceTransportController::Create(
CreateAudioCodec();
controller_ = std::make_unique<CongestionControl>();
packet_sender_ = std::make_unique<PacketSender>(ice_agent, webrtc_clock_);
resolution_adapter_ = std::make_unique<ResolutionAdapter>();
video_channel_send_ = std::make_unique<VideoChannelSend>(
@@ -471,7 +472,6 @@ void IceTransportController::PostUpdates(webrtc::NetworkControlUpdate update) {
target_bitrate_ = target_bitrate;
int width, height, target_width, target_height;
video_encoder_->GetResolution(&width, &height);
if (0 == resolution_adapter_->GetResolution(target_bitrate_, width,
height, &target_width,
&target_height)) {
@@ -480,19 +480,20 @@ void IceTransportController::PostUpdates(webrtc::NetworkControlUpdate update) {
target_height_ = target_height;
b_force_i_frame_ = true;
// LOG_INFO("Set target resolution [{}x{}]", target_width_.value(),
// target_height_.value());
}
} else if (target_width_.has_value() && target_height_.has_value()) {
target_width_.reset();
target_height_.reset();
// LOG_INFO("Use original resolution [{}x{}]", source_width_,
// source_height_);
}
video_encoder_->SetTargetBitrate(target_bitrate_);
LOG_WARN("Set target bitrate [{}]bps", target_bitrate_);
}
}
if (!update.probe_cluster_configs.empty()) {
packet_sender_->CreateProbeClusters(
std::move(update.probe_cluster_configs));
}
}
void IceTransportController::UpdateControlState() {

View File

@@ -14,12 +14,14 @@
#include "audio_channel_send.h"
#include "audio_decoder.h"
#include "audio_encoder.h"
#include "bitrate_prober.h"
#include "clock/system_clock.h"
#include "congestion_control.h"
#include "congestion_control_feedback.h"
#include "data_channel_receive.h"
#include "data_channel_send.h"
#include "ice_agent.h"
#include "packet_sender.h"
#include "resolution_adapter.h"
#include "transport_feedback_adapter.h"
#include "video_channel_receive.h"
@@ -103,7 +105,7 @@ class IceTransportController
std::shared_ptr<IceAgent> ice_agent_ = nullptr;
std::shared_ptr<IOStatistics> ice_io_statistics_ = nullptr;
std::unique_ptr<RtpPacketizer> rtp_packetizer_ = nullptr;
std::unique_ptr<RtpVideoSender> rtp_video_sender_ = nullptr;
std::unique_ptr<PacketSender> packet_sender_ = nullptr;
std::string remote_user_id_;
void *user_data_ = nullptr;
@@ -112,6 +114,7 @@ class IceTransportController
std::shared_ptr<webrtc::Clock> webrtc_clock_ = nullptr;
webrtc::TransportFeedbackAdapter transport_feedback_adapter_;
std::unique_ptr<CongestionControl> controller_;
BitrateProber prober_;
private:
std::unique_ptr<VideoEncoder> video_encoder_ = nullptr;

View File

@@ -0,0 +1,118 @@
#include "packet_sender.h"
#include "log.h"
PacketSender::PacketSender(std::shared_ptr<IceAgent> ice_agent,
std::shared_ptr<webrtc::Clock> clock)
: ice_agent_(ice_agent),
clock_(clock),
pacing_controller_(clock.get(), this) {}
PacketSender::~PacketSender() {}
// int PacketSender::SendPacket(const char *data, size_t size) {
// LOG_INFO("Send packet, size: %d", size);
// return ice_agent_->Send(data, size);
// }
// void PacketSender::CreateProbeClusters(
// std::vector<webrtc::ProbeClusterConfig> probe_cluster_configs) {
// pacing_controller_.CreateProbeClusters(probe_cluster_configs);
// MaybeScheduleProcessPackets();
// }
// void PacketSender::MaybeScheduleProcessPackets() {
// if (!processing_packets_)
// MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
// }
// void PacketSender::MaybeProcessPackets(
// webrtc::Timestamp scheduled_process_time) {
// if (is_shutdown_ || !is_started_) {
// return;
// }
// // Protects against re-entry from transport feedback calling into the task
// // queue pacer.
// processing_packets_ = true;
// auto cleanup = std::unique_ptr<void, std::function<void(void *)>>(
// nullptr, [this](void *) { processing_packets_ = false; });
// webrtc::Timestamp next_send_time = pacing_controller_.NextSendTime();
// const webrtc::Timestamp now = clock_->CurrentTime();
// webrtc::TimeDelta early_execute_margin =
// pacing_controller_.IsProbing()
// ? webrtc::PacingController::kMaxEarlyProbeProcessing
// : webrtc::TimeDelta::Zero();
// // Process packets and update stats.
// while (next_send_time <= now + early_execute_margin) {
// pacing_controller_.ProcessPackets();
// next_send_time = pacing_controller_.NextSendTime();
// // Probing state could change. Get margin after process packets.
// early_execute_margin =
// pacing_controller_.IsProbing()
// ? webrtc::PacingController::kMaxEarlyProbeProcessing
// : webrtc::TimeDelta::Zero();
// }
// UpdateStats();
// // Ignore retired scheduled task, otherwise reset `next_process_time_`.
// if (scheduled_process_time.IsFinite()) {
// if (scheduled_process_time != next_process_time_) {
// return;
// }
// next_process_time_ = webrtc::Timestamp::MinusInfinity();
// }
// // Do not hold back in probing.
// webrtc::TimeDelta hold_back_window = webrtc::TimeDelta::Zero();
// if (!pacing_controller_.IsProbing()) {
// hold_back_window = max_hold_back_window_;
// webrtc::DataRate pacing_rate = pacing_controller_.pacing_rate();
// if (max_hold_back_window_in_packets_ != kNoPacketHoldback &&
// !pacing_rate.IsZero() &&
// packet_size_.filtered() != rtc::ExpFilter::kValueUndefined) {
// webrtc::TimeDelta avg_packet_send_time =
// webrtc::DataSize::Bytes(packet_size_.filtered()) / pacing_rate;
// hold_back_window =
// std::min(hold_back_window,
// avg_packet_send_time * max_hold_back_window_in_packets_);
// }
// }
// // Calculate next process time.
// webrtc::TimeDelta time_to_next_process =
// std::max(hold_back_window, next_send_time - now -
// early_execute_margin);
// next_send_time = now + time_to_next_process;
// // If no in flight task or in flight task is later than `next_send_time`,
// // schedule a new one. Previous in flight task will be retired.
// if (next_process_time_.IsMinusInfinity() ||
// next_process_time_ > next_send_time) {
// // Prefer low precision if allowed and not probing.
// task_queue_->PostDelayedHighPrecisionTask(
// SafeTask(
// safety_.flag(),
// [this, next_send_time]() { MaybeProcessPackets(next_send_time);
// }),
// time_to_next_process.RoundUpTo(webrtc::TimeDelta::Millis(1)));
// next_process_time_ = next_send_time;
// }
// }
// void PacketSender::UpdateStats() {
// Stats new_stats;
// new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime();
// new_stats.first_sent_packet_time =
// pacing_controller_.FirstSentPacketTime();
// new_stats.oldest_packet_enqueue_time =
// pacing_controller_.OldestPacketEnqueueTime();
// new_stats.queue_size = pacing_controller_.QueueSizeData();
// OnStatsUpdated(new_stats);
// }
// PacketSender::Stats PacketSender::GetStats() const { return current_stats_; }

View File

@@ -0,0 +1,116 @@
/*
* @Author: DI JUNKUN
* @Date: 2025-03-12
* Copyright (c) 2025 by DI JUNKUN, All Rights Reserved.
*/
#ifndef _PACKET_SENDER_H_
#define _PACKET_SENDER_H_
#include <memory>
#include "api/array_view.h"
#include "api/transport/network_types.h"
#include "api/units/data_rate.h"
#include "api/units/data_size.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "ice_agent.h"
#include "pacing_controller.h"
#include "rtc_base/numerics/exp_filter.h"
#include "rtp_packet_pacer.h"
#include "rtp_packet_to_send.h"
class PacketSender : public webrtc::RtpPacketPacer,
public webrtc::PacingController::PacketSender {
public:
PacketSender(std::shared_ptr<IceAgent> ice_agent,
std::shared_ptr<webrtc::Clock> clock);
~PacketSender();
int SendPacket(const char* data, size_t size);
public:
void CreateProbeClusters(
std::vector<webrtc::ProbeClusterConfig> probe_cluster_configs) override{};
// Temporarily pause all sending.
void Pause() override{};
// Resume sending packets.
void Resume() override{};
void SetCongested(bool congested) override{};
// Sets the pacing rates. Must be called once before packets can be sent.
void SetPacingRates(webrtc::DataRate pacing_rate,
webrtc::DataRate padding_rate) override{};
// Time since the oldest packet currently in the queue was added.
webrtc::TimeDelta OldestPacketWaitTime() const override {
return webrtc::TimeDelta::Zero();
};
// Sum of payload + padding bytes of all packets currently in the pacer queue.
webrtc::DataSize QueueSizeData() const override {
return webrtc::DataSize::Zero();
};
// Returns the time when the first packet was sent.
std::optional<webrtc::Timestamp> FirstSentPacketTime() const override {
return {};
}
// Returns the expected number of milliseconds it will take to send the
// current packets in the queue, given the current size and bitrate, ignoring
// priority.
webrtc::TimeDelta ExpectedQueueTime() const override {
return webrtc::TimeDelta::Zero();
};
// Set the average upper bound on pacer queuing delay. The pacer may send at
// a higher rate than what was configured via SetPacingRates() in order to
// keep ExpectedQueueTimeMs() below `limit_ms` on average.
void SetQueueTimeLimit(webrtc::TimeDelta limit) override{};
// Currently audio traffic is not accounted by pacer and passed through.
// With the introduction of audio BWE audio traffic will be accounted for
// the pacer budget calculation. The audio traffic still will be injected
// at high priority.
void SetAccountForAudioPackets(bool account_for_audio) override{};
void SetIncludeOverhead() override{};
void SetTransportOverhead(webrtc::DataSize overhead_per_packet) override{};
public:
void SendPacket(std::unique_ptr<webrtc::RtpPacketToSend> packet,
const webrtc::PacedPacketInfo& cluster_info) override {}
// Should be called after each call to SendPacket().
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> FetchFec() override {
return {};
}
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> GeneratePadding(
webrtc::DataSize size) override {
return {};
}
// TODO(bugs.webrtc.org/1439830): Make pure once subclasses adapt.
void OnBatchComplete() override {}
// TODO(bugs.webrtc.org/11340): Make pure once downstream projects
// have been updated.
void OnAbortedRetransmissions(
uint32_t /* ssrc */,
rtc::ArrayView<const uint16_t> /* sequence_numbers */) {}
std::optional<uint32_t> GetRtxSsrcForMedia(
uint32_t /* ssrc */) const override {
return std::nullopt;
}
private:
std::shared_ptr<IceAgent> ice_agent_ = nullptr;
webrtc::PacingController pacing_controller_;
private:
std::shared_ptr<webrtc::Clock> clock_ = nullptr;
};
#endif

View File

@@ -113,10 +113,10 @@ target("ws")
target("rtp")
set_kind("object")
add_deps("log", "common", "frame", "ringbuffer", "thread", "rtcp", "fec", "statistics")
add_files("src/rtp/*.cpp",
add_files("src/rtp/rtp_statistics/*.cpp",
"src/rtp/rtp_packet/*.cpp",
"src/rtp/rtp_packetizer/*.cpp")
add_includedirs("src/rtp",
add_includedirs("src/rtp/rtp_statistics",
"src/rtp/rtp_packet",
"src/rtp/rtp_packetizer", {public = true})
@@ -148,7 +148,7 @@ target("channel")
target("transport")
set_kind("object")
add_deps("log", "ws", "ice", "channel", "rtp", "rtcp", "statistics", "media")
add_deps("log", "ws", "ice", "channel", "rtp", "rtcp", "statistics", "media", "qos")
add_files("src/transport/*.cpp")
add_includedirs("src/transport", {public = true})