diff --git a/src/common/inlined_vector.h b/src/common/inlined_vector.h new file mode 100644 index 0000000..e1fa035 --- /dev/null +++ b/src/common/inlined_vector.h @@ -0,0 +1,65 @@ +/* + * @Author: DI JUNKUN + * @Date: 2025-03-12 + * Copyright (c) 2025 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _INLINED_VECTOR_H_ +#define _INLINED_VECTOR_H_ + +#include +#include +#include +#include + +template +class InlinedVector { + public: + InlinedVector() : size_(0), use_heap_(false) {} + + void push_back(const T& value) { + if (!use_heap_ && size_ < N) { + stack_data_[size_] = value; + } else { + if (!use_heap_) { + heap_data_.reserve(N * 2); + for (size_t i = 0; i < size_; ++i) { + heap_data_.push_back(stack_data_[i]); + } + use_heap_ = true; + } + heap_data_.push_back(value); + } + ++size_; + } + + void assign(size_t n, const T& value) { + clear(); + for (size_t i = 0; i < n; ++i) { + push_back(value); + } + } + + size_t size() const { return size_; } + T& operator[](size_t index) { + return use_heap_ ? heap_data_[index] : stack_data_[index]; + } + + const T& operator[](size_t index) const { + return use_heap_ ? heap_data_[index] : stack_data_[index]; + } + + private: + void clear() { + size_ = 0; + use_heap_ = false; + heap_data_.clear(); + } + + size_t size_; + bool use_heap_; + std::array stack_data_; + std::vector heap_data_; +}; + +#endif // _INLINED_VECTOR_H_ diff --git a/src/qos/bitrate_prober.cc b/src/qos/bitrate_prober.cc new file mode 100644 index 0000000..53c7607 --- /dev/null +++ b/src/qos/bitrate_prober.cc @@ -0,0 +1,192 @@ +/* + * Copyright (c) 2014 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "bitrate_prober.h" + +#include +#include +#include + +#include "api/transport/network_types.h" +#include "api/units/data_rate.h" +#include "api/units/data_size.h" +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" +#include "log.h" + +namespace webrtc { + +namespace { +constexpr TimeDelta kProbeClusterTimeout = TimeDelta::Seconds(5); +constexpr size_t kMaxPendingProbeClusters = 5; + +} // namespace + +BitrateProberConfig::BitrateProberConfig() + : max_probe_delay(TimeDelta::Millis(10)), + min_packet_size(DataSize::Bytes(200)) {} + +BitrateProber::BitrateProber() + : probing_state_(ProbingState::kDisabled), + next_probe_time_(Timestamp::PlusInfinity()) { + SetEnabled(true); +} + +void BitrateProber::SetEnabled(bool enable) { + if (enable) { + if (probing_state_ == ProbingState::kDisabled) { + probing_state_ = ProbingState::kInactive; + LOG_INFO("Bandwidth probing enabled, set to inactive"); + } + } else { + probing_state_ = ProbingState::kDisabled; + LOG_INFO("Bandwidth probing disabled"); + } +} + +void BitrateProber::SetAllowProbeWithoutMediaPacket(bool allow) { + config_.allow_start_probing_immediately = allow; + MaybeSetActiveState(/*packet_size=*/DataSize::Zero()); +} + +void BitrateProber::MaybeSetActiveState(DataSize packet_size) { + if (ReadyToSetActiveState(packet_size)) { + next_probe_time_ = Timestamp::MinusInfinity(); + probing_state_ = ProbingState::kActive; + } +} + +bool BitrateProber::ReadyToSetActiveState(DataSize packet_size) const { + if (clusters_.empty()) { + return false; + } + switch (probing_state_) { + case ProbingState::kDisabled: + case ProbingState::kActive: + return false; + case ProbingState::kInactive: + if (config_.allow_start_probing_immediately) { + return true; + } + // If config_.min_packet_size > 0, a "large enough" packet must be + // sent first, before a probe can be generated and sent. Otherwise, + // send the probe asap. + return packet_size >= + std::min(RecommendedMinProbeSize(), config_.min_packet_size); + } + + return false; +} + +void BitrateProber::OnIncomingPacket(DataSize packet_size) { + MaybeSetActiveState(packet_size); +} + +void BitrateProber::CreateProbeCluster( + const ProbeClusterConfig& cluster_config) { + while (!clusters_.empty() && + (cluster_config.at_time - clusters_.front().requested_at > + kProbeClusterTimeout || + clusters_.size() > kMaxPendingProbeClusters)) { + clusters_.pop(); + } + + ProbeCluster cluster; + cluster.requested_at = cluster_config.at_time; + cluster.pace_info.probe_cluster_min_probes = + cluster_config.target_probe_count; + cluster.pace_info.probe_cluster_min_bytes = + (cluster_config.target_data_rate * cluster_config.target_duration) + .bytes(); + cluster.min_probe_delta = cluster_config.min_probe_delta; + cluster.pace_info.send_bitrate = cluster_config.target_data_rate; + cluster.pace_info.probe_cluster_id = cluster_config.id; + clusters_.push(cluster); + + MaybeSetActiveState(/*packet_size=*/DataSize::Zero()); + + LOG_INFO("Probe cluster (bitrate_bps:min bytes:min packets): ({}:{}:{}, {})", + cluster.pace_info.send_bitrate.bps(), + cluster.pace_info.probe_cluster_min_bytes, + cluster.pace_info.probe_cluster_min_probes, + probing_state_ == ProbingState::kInactive ? "Inactive" : "Active"); +} + +Timestamp BitrateProber::NextProbeTime(Timestamp /* now */) const { + // Probing is not active or probing is already complete. + if (probing_state_ != ProbingState::kActive || clusters_.empty()) { + return Timestamp::PlusInfinity(); + } + + return next_probe_time_; +} + +std::optional BitrateProber::CurrentCluster(Timestamp now) { + if (clusters_.empty() || probing_state_ != ProbingState::kActive) { + return std::nullopt; + } + + if (next_probe_time_.IsFinite() && + now - next_probe_time_ > config_.max_probe_delay) { + LOG_WARN( + "Probe delay too high (next_ms:{}, now_ms: {}), discarding probe " + "cluster.", + next_probe_time_.ms(), now.ms()); + clusters_.pop(); + if (clusters_.empty()) { + probing_state_ = ProbingState::kInactive; + return std::nullopt; + } + } + + PacedPacketInfo info = clusters_.front().pace_info; + info.probe_cluster_bytes_sent = clusters_.front().sent_bytes; + return info; +} + +DataSize BitrateProber::RecommendedMinProbeSize() const { + if (clusters_.empty()) { + return DataSize::Zero(); + } + DataRate send_rate = clusters_.front().pace_info.send_bitrate; + return send_rate * clusters_.front().min_probe_delta; +} + +void BitrateProber::ProbeSent(Timestamp now, DataSize size) { + if (!clusters_.empty()) { + ProbeCluster* cluster = &clusters_.front(); + if (cluster->sent_probes == 0) { + cluster->started_at = now; + } + cluster->sent_bytes += size.bytes(); + cluster->sent_probes += 1; + next_probe_time_ = CalculateNextProbeTime(*cluster); + if (cluster->sent_bytes >= cluster->pace_info.probe_cluster_min_bytes && + cluster->sent_probes >= cluster->pace_info.probe_cluster_min_probes) { + clusters_.pop(); + } + if (clusters_.empty()) { + probing_state_ = ProbingState::kInactive; + } + } +} + +Timestamp BitrateProber::CalculateNextProbeTime( + const ProbeCluster& cluster) const { + // Compute the time delta from the cluster start to ensure probe bitrate stays + // close to the target bitrate. Result is in milliseconds. + DataSize sent_bytes = DataSize::Bytes(cluster.sent_bytes); + DataRate send_bitrate = cluster.pace_info.send_bitrate; + + TimeDelta delta = sent_bytes / send_bitrate; + return cluster.started_at + delta; +} + +} // namespace webrtc diff --git a/src/qos/bitrate_prober.h b/src/qos/bitrate_prober.h new file mode 100644 index 0000000..3ffc253 --- /dev/null +++ b/src/qos/bitrate_prober.h @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2014 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_PACING_BITRATE_PROBER_H_ +#define MODULES_PACING_BITRATE_PROBER_H_ + +#include + +#include +#include + +#include "api/transport/network_types.h" +#include "api/units/data_size.h" +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" + +namespace webrtc { + +struct BitrateProberConfig { + explicit BitrateProberConfig(); + BitrateProberConfig(const BitrateProberConfig&) = default; + BitrateProberConfig& operator=(const BitrateProberConfig&) = default; + ~BitrateProberConfig() = default; + + // Maximum amount of time each probe can be delayed. + TimeDelta max_probe_delay; + // This is used to start sending a probe after a large enough packet. + // The min packet size is scaled with the bitrate we're probing at. + // This defines the max min packet size, meaning that on high bitrates + // a packet of at least this size is needed to trigger sending a probe. + DataSize min_packet_size; + + // If true, `min_packet_size` is ignored. + bool allow_start_probing_immediately = false; +}; + +// Note that this class isn't thread-safe by itself and therefore relies +// on being protected by the caller. +class BitrateProber { + public: + explicit BitrateProber(); + ~BitrateProber() = default; + + void SetEnabled(bool enable); + void SetAllowProbeWithoutMediaPacket(bool allow); + + // Returns true if the prober is in a probing session, i.e., it currently + // wants packets to be sent out according to the time returned by + // TimeUntilNextProbe(). + bool is_probing() const { return probing_state_ == ProbingState::kActive; } + + // Initializes a new probing session if the prober is allowed to probe. Does + // not initialize the prober unless the packet size is large enough to probe + // with. + void OnIncomingPacket(DataSize packet_size); + + // Create a cluster used to probe. + void CreateProbeCluster(const ProbeClusterConfig& cluster_config); + // Returns the time at which the next probe should be sent to get accurate + // probing. If probing is not desired at this time, Timestamp::PlusInfinity() + // will be returned. + // TODO(bugs.webrtc.org/11780): Remove `now` argument when old mode is gone. + Timestamp NextProbeTime(Timestamp now) const; + + // Information about the current probing cluster. + std::optional CurrentCluster(Timestamp now); + + // Returns the minimum number of bytes that the prober recommends for + // the next probe, or zero if not probing. A probe can consist of multiple + // packets that are sent back to back. + DataSize RecommendedMinProbeSize() const; + + // Called to report to the prober that a probe has been sent. In case of + // multiple packets per probe, this call would be made at the end of sending + // the last packet in probe. `size` is the total size of all packets in probe. + void ProbeSent(Timestamp now, DataSize size); + + private: + enum class ProbingState { + // Probing will not be triggered in this state at all times. + kDisabled, + // Probing is enabled and ready to trigger on the first packet arrival if + // there is a probe cluster. + kInactive, + // Probe cluster is filled with the set of data rates to be probed and + // probes are being sent. + kActive, + }; + + // A probe cluster consists of a set of probes. Each probe in turn can be + // divided into a number of packets to accommodate the MTU on the network. + struct ProbeCluster { + PacedPacketInfo pace_info; + + int sent_probes = 0; + int sent_bytes = 0; + TimeDelta min_probe_delta = TimeDelta::Zero(); + Timestamp requested_at = Timestamp::MinusInfinity(); + Timestamp started_at = Timestamp::MinusInfinity(); + }; + + Timestamp CalculateNextProbeTime(const ProbeCluster& cluster) const; + + void MaybeSetActiveState(DataSize packet_size); + bool ReadyToSetActiveState(DataSize packet_size) const; + + ProbingState probing_state_; + + // Probe bitrate per packet. These are used to compute the delta relative to + // the previous probe packet based on the size and time when that packet was + // sent. + std::queue clusters_; + + // Time the next probe should be sent when in kActive state. + Timestamp next_probe_time_; + + BitrateProberConfig config_; +}; + +} // namespace webrtc + +#endif // MODULES_PACING_BITRATE_PROBER_H_ diff --git a/src/qos/pacing_controller.cc b/src/qos/pacing_controller.cc new file mode 100644 index 0000000..5156684 --- /dev/null +++ b/src/qos/pacing_controller.cc @@ -0,0 +1,691 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "pacing_controller.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "api/array_view.h" +#include "api/clock/clock.h" +#include "api/transport/network_types.h" +#include "api/units/data_rate.h" +#include "api/units/data_size.h" +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" +#include "bitrate_prober.h" +#include "log.h" +#include "rtc_base/numerics/safe_conversions.h" +#include "rtp_rtcp_defines.h" + +namespace webrtc { +namespace { +constexpr TimeDelta kCongestedPacketInterval = TimeDelta::Millis(500); +// TODO(sprang): Consider dropping this limit. +// The maximum debt level, in terms of time, capped when sending packets. +constexpr TimeDelta kMaxDebtInTime = TimeDelta::Millis(500); +constexpr TimeDelta kMaxElapsedTime = TimeDelta::Seconds(2); + +} // namespace + +const TimeDelta PacingController::kPausedProcessInterval = + kCongestedPacketInterval; +const TimeDelta PacingController::kMinSleepTime = TimeDelta::Millis(1); +const TimeDelta PacingController::kTargetPaddingDuration = TimeDelta::Millis(5); +const TimeDelta PacingController::kMaxPaddingReplayDuration = + TimeDelta::Millis(50); +const TimeDelta PacingController::kMaxEarlyProbeProcessing = + TimeDelta::Millis(1); + +PacingController::PacingController(Clock* clock, PacketSender* packet_sender, + Configuration configuration) + : clock_(clock), + packet_sender_(packet_sender), + drain_large_queues_(configuration.drain_large_queues), + send_padding_if_silent_(true), + pace_audio_(false), + ignore_transport_overhead_(false), + fast_retransmissions_(true), + keyframe_flushing_(configuration.keyframe_flushing), + transport_overhead_per_packet_(DataSize::Zero()), + send_burst_interval_(configuration.send_burst_interval), + last_timestamp_(clock->CurrentTime()), + paused_(false), + media_debt_(DataSize::Zero()), + padding_debt_(DataSize::Zero()), + pacing_rate_(DataRate::Zero()), + adjusted_media_rate_(DataRate::Zero()), + padding_rate_(DataRate::Zero()), + probing_send_failure_(false), + last_process_time_(clock->CurrentTime()), + last_send_time_(last_process_time_), + seen_first_packet_(false), + packet_queue_(/*creation_time=*/last_process_time_, + configuration.prioritize_audio_retransmission, + configuration.packet_queue_ttl), + congested_(false), + queue_time_limit_(configuration.queue_time_limit), + account_for_audio_(false), + include_overhead_(false), + circuit_breaker_threshold_(1 << 16) { + if (!drain_large_queues_) { + LOG_WARN( + "Pacer queues will not be drained, pushback experiment must be " + "enabled."); + } +} + +PacingController::~PacingController() = default; + +void PacingController::CreateProbeClusters( + rtc::ArrayView probe_cluster_configs) { + for (const ProbeClusterConfig probe_cluster_config : probe_cluster_configs) { + prober_.CreateProbeCluster(probe_cluster_config); + } +} + +void PacingController::Pause() { + if (!paused_) { + LOG_INFO("PacedSender paused."); + } + paused_ = true; + packet_queue_.SetPauseState(true, CurrentTime()); +} + +void PacingController::Resume() { + if (paused_) { + LOG_INFO("PacedSender resumed."); + } + paused_ = false; + packet_queue_.SetPauseState(false, CurrentTime()); +} + +bool PacingController::IsPaused() const { return paused_; } + +void PacingController::SetCongested(bool congested) { + if (congested_ && !congested) { + UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(CurrentTime())); + } + congested_ = congested; +} + +void PacingController::SetCircuitBreakerThreshold(int num_iterations) { + circuit_breaker_threshold_ = num_iterations; +} + +void PacingController::RemovePacketsForSsrc(uint32_t ssrc) { + packet_queue_.RemovePacketsForSsrc(ssrc); +} + +bool PacingController::IsProbing() const { return prober_.is_probing(); } + +Timestamp PacingController::CurrentTime() const { + Timestamp time = clock_->CurrentTime(); + if (time < last_timestamp_) { + LOG_WARN( + "Non-monotonic clock behavior observed. Previous timestamp: {}, new " + "timestamp: {}", + last_timestamp_.ms(), time.ms()); + time = last_timestamp_; + } + last_timestamp_ = time; + return time; +} + +void PacingController::SetProbingEnabled(bool enabled) { + prober_.SetEnabled(enabled); +} + +void PacingController::SetPacingRates(DataRate pacing_rate, + DataRate padding_rate) { + if (padding_rate > pacing_rate) { + LOG_WARN( + "Padding rate {}kbps is higher than the pacing rate {}kbps, capping.", + padding_rate.kbps(), pacing_rate.kbps()); + padding_rate = pacing_rate; + } + + if (pacing_rate > max_rate || padding_rate > max_rate) { + LOG_WARN( + "Very high pacing rates ( > {} kbps) configured: pacing = {} kbps, " + "padding = {}kbps.", + max_rate.kbps(), pacing_rate.kbps(), padding_rate.kbps()); + max_rate = std::max(pacing_rate, padding_rate) * 1.1; + } + pacing_rate_ = pacing_rate; + padding_rate_ = padding_rate; + MaybeUpdateMediaRateDueToLongQueue(CurrentTime()); + + LOG_INFO("bwe:pacer_updated pacing_kbps={} padding_budget_kbps={}", + pacing_rate_.kbps(), padding_rate.kbps()); +} + +void PacingController::EnqueuePacket(std::unique_ptr packet) { + if (keyframe_flushing_ && + packet->packet_type() == RtpPacketMediaType::kVideo && + packet->is_key_frame() && packet->is_first_packet_of_frame() && + !packet_queue_.HasKeyframePackets(packet->Ssrc())) { + // First packet of a keyframe (and no keyframe packets currently in the + // queue). Flush any pending packets currently in the queue for that stream + // in order to get the new keyframe out as quickly as possible. + packet_queue_.RemovePacketsForSsrc(packet->Ssrc()); + std::optional rtx_ssrc = + packet_sender_->GetRtxSsrcForMedia(packet->Ssrc()); + if (rtx_ssrc) { + packet_queue_.RemovePacketsForSsrc(*rtx_ssrc); + } + } + + prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size())); + + const Timestamp now = CurrentTime(); + if (packet_queue_.Empty()) { + // If queue is empty, we need to "fast-forward" the last process time, + // so that we don't use passed time as budget for sending the first new + // packet. + Timestamp target_process_time = now; + Timestamp next_send_time = NextSendTime(); + if (next_send_time.IsFinite()) { + // There was already a valid planned send time, such as a keep-alive. + // Use that as last process time only if it's prior to now. + target_process_time = std::min(now, next_send_time); + } + UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_process_time)); + } + packet_queue_.Push(now, std::move(packet)); + seen_first_packet_ = true; + + // Queue length has increased, check if we need to change the pacing rate. + MaybeUpdateMediaRateDueToLongQueue(now); +} + +void PacingController::SetAccountForAudioPackets(bool account_for_audio) { + account_for_audio_ = account_for_audio; +} + +void PacingController::SetIncludeOverhead() { include_overhead_ = true; } + +void PacingController::SetTransportOverhead(DataSize overhead_per_packet) { + if (ignore_transport_overhead_) return; + transport_overhead_per_packet_ = overhead_per_packet; +} + +void PacingController::SetSendBurstInterval(TimeDelta burst_interval) { + send_burst_interval_ = burst_interval; +} + +void PacingController::SetAllowProbeWithoutMediaPacket(bool allow) { + prober_.SetAllowProbeWithoutMediaPacket(allow); +} + +TimeDelta PacingController::ExpectedQueueTime() const { + return QueueSizeData() / adjusted_media_rate_; +} + +size_t PacingController::QueueSizePackets() const { + return rtc::checked_cast(packet_queue_.SizeInPackets()); +} + +const std::array& +PacingController::SizeInPacketsPerRtpPacketMediaType() const { + return packet_queue_.SizeInPacketsPerRtpPacketMediaType(); +} + +DataSize PacingController::QueueSizeData() const { + DataSize size = packet_queue_.SizeInPayloadBytes(); + if (include_overhead_) { + size += static_cast(packet_queue_.SizeInPackets()) * + transport_overhead_per_packet_; + } + return size; +} + +DataSize PacingController::CurrentBufferLevel() const { + return std::max(media_debt_, padding_debt_); +} + +std::optional PacingController::FirstSentPacketTime() const { + return first_sent_packet_time_; +} + +Timestamp PacingController::OldestPacketEnqueueTime() const { + return packet_queue_.OldestEnqueueTime(); +} + +TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) { + // If no previous processing, or last process was "in the future" because of + // early probe processing, then there is no elapsed time to add budget for. + if (last_process_time_.IsMinusInfinity() || now < last_process_time_) { + return TimeDelta::Zero(); + } + TimeDelta elapsed_time = now - last_process_time_; + last_process_time_ = now; + if (elapsed_time > kMaxElapsedTime) { + LOG_WARN("Elapsed time ({}) longer than expected, limiting to {}", + elapsed_time.seconds(), kMaxElapsedTime.seconds()); + elapsed_time = kMaxElapsedTime; + } + return elapsed_time; +} + +bool PacingController::ShouldSendKeepalive(Timestamp now) const { + if (send_padding_if_silent_ || paused_ || congested_ || !seen_first_packet_) { + // We send a padding packet every 500 ms to ensure we won't get stuck in + // congested state due to no feedback being received. + if (now - last_send_time_ >= kCongestedPacketInterval) { + return true; + } + } + return false; +} + +Timestamp PacingController::NextSendTime() const { + const Timestamp now = CurrentTime(); + Timestamp next_send_time = Timestamp::PlusInfinity(); + + if (paused_) { + return last_send_time_ + kPausedProcessInterval; + } + + // If probing is active, that always takes priority. + if (prober_.is_probing() && !probing_send_failure_) { + Timestamp probe_time = prober_.NextProbeTime(now); + if (!probe_time.IsPlusInfinity()) { + return probe_time.IsMinusInfinity() ? now : probe_time; + } + } + + // If queue contains a packet which should not be paced, its target send time + // is the time at which it was enqueued. + Timestamp unpaced_send_time = NextUnpacedSendTime(); + if (unpaced_send_time.IsFinite()) { + return unpaced_send_time; + } + + if (congested_ || !seen_first_packet_) { + // We need to at least send keep-alive packets with some interval. + return last_send_time_ + kCongestedPacketInterval; + } + + if (adjusted_media_rate_ > DataRate::Zero() && !packet_queue_.Empty()) { + // If packets are allowed to be sent in a burst, the + // debt is allowed to grow up to one packet more than what can be sent + // during 'send_burst_period_'. + TimeDelta drain_time = media_debt_ / adjusted_media_rate_; + // Ensure that a burst of sent packet is not larger than kMaxBurstSize in + // order to not risk overfilling socket buffers at high bitrate. + TimeDelta send_burst_interval = + std::min(send_burst_interval_, kMaxBurstSize / adjusted_media_rate_); + next_send_time = + last_process_time_ + + ((send_burst_interval > drain_time) ? TimeDelta::Zero() : drain_time); + } else if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) { + // If we _don't_ have pending packets, check how long until we have + // bandwidth for padding packets. Both media and padding debts must + // have been drained to do this. + TimeDelta drain_time = std::max(media_debt_ / adjusted_media_rate_, + padding_debt_ / padding_rate_); + + if (drain_time.IsZero() && + (!media_debt_.IsZero() || !padding_debt_.IsZero())) { + // We have a non-zero debt, but drain time is smaller than tick size of + // TimeDelta, round it up to the smallest possible non-zero delta. + drain_time = TimeDelta::Micros(1); + } + next_send_time = last_process_time_ + drain_time; + } else { + // Nothing to do. + next_send_time = last_process_time_ + kPausedProcessInterval; + } + + if (send_padding_if_silent_) { + next_send_time = + std::min(next_send_time, last_send_time_ + kPausedProcessInterval); + } + + return next_send_time; +} + +void PacingController::ProcessPackets() { + auto cleanup = std::unique_ptr>( + nullptr, [packet_sender = packet_sender_](void*) { + packet_sender->OnBatchComplete(); + }); + const Timestamp now = CurrentTime(); + Timestamp target_send_time = now; + + if (ShouldSendKeepalive(now)) { + DataSize keepalive_data_sent = DataSize::Zero(); + // We can not send padding unless a normal packet has first been sent. If + // we do, timestamps get messed up. + if (seen_first_packet_) { + std::vector> keepalive_packets = + packet_sender_->GeneratePadding(DataSize::Bytes(1)); + for (auto& packet : keepalive_packets) { + keepalive_data_sent += + DataSize::Bytes(packet->payload_size() + packet->padding_size()); + packet_sender_->SendPacket(std::move(packet), PacedPacketInfo()); + for (auto& packet : packet_sender_->FetchFec()) { + EnqueuePacket(std::move(packet)); + } + } + } + OnPacketSent(RtpPacketMediaType::kPadding, keepalive_data_sent, now); + } + + if (paused_) { + return; + } + + TimeDelta early_execute_margin = + prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero(); + + target_send_time = NextSendTime(); + if (now + early_execute_margin < target_send_time) { + // We are too early, but if queue is empty still allow draining some debt. + // Probing is allowed to be sent up to kMinSleepTime early. + UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(now)); + return; + } + + TimeDelta elapsed_time = UpdateTimeAndGetElapsed(target_send_time); + + if (elapsed_time > TimeDelta::Zero()) { + UpdateBudgetWithElapsedTime(elapsed_time); + } + + PacedPacketInfo pacing_info; + DataSize recommended_probe_size = DataSize::Zero(); + bool is_probing = prober_.is_probing(); + if (is_probing) { + // Probe timing is sensitive, and handled explicitly by BitrateProber, so + // use actual send time rather than target. + pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo()); + if (pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe) { + recommended_probe_size = prober_.RecommendedMinProbeSize(); + } else { + // No valid probe cluster returned, probe might have timed out. + is_probing = false; + } + } + + DataSize data_sent = DataSize::Zero(); + int iteration = 0; + int packets_sent = 0; + int padding_packets_generated = 0; + for (; iteration < circuit_breaker_threshold_; ++iteration) { + // Fetch packet, so long as queue is not empty or budget is not + // exhausted. + std::unique_ptr rtp_packet = + GetPendingPacket(pacing_info, target_send_time, now); + if (rtp_packet == nullptr) { + // No packet available to send, check if we should send padding. + if (now - target_send_time > kMaxPaddingReplayDuration) { + // The target send time is more than `kMaxPaddingReplayDuration` behind + // the real-time clock. This can happen if the clock is adjusted forward + // without `ProcessPackets()` having been called at the expected times. + target_send_time = now - kMaxPaddingReplayDuration; + last_process_time_ = std::max(last_process_time_, target_send_time); + } + + DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent); + if (padding_to_add > DataSize::Zero()) { + std::vector> padding_packets = + packet_sender_->GeneratePadding(padding_to_add); + if (!padding_packets.empty()) { + padding_packets_generated += padding_packets.size(); + for (auto& packet : padding_packets) { + EnqueuePacket(std::move(packet)); + } + // Continue loop to send the padding that was just added. + continue; + } else { + // Can't generate padding, still update padding budget for next send + // time. + UpdatePaddingBudgetWithSentData(padding_to_add); + } + } + // Can't fetch new packet and no padding to send, exit send loop. + break; + } else { + const RtpPacketMediaType packet_type = *rtp_packet->packet_type(); + DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() + + rtp_packet->padding_size()); + + if (include_overhead_) { + packet_size += DataSize::Bytes(rtp_packet->headers_size()) + + transport_overhead_per_packet_; + } + + packet_sender_->SendPacket(std::move(rtp_packet), pacing_info); + for (auto& packet : packet_sender_->FetchFec()) { + EnqueuePacket(std::move(packet)); + } + data_sent += packet_size; + ++packets_sent; + + // Send done, update send time. + OnPacketSent(packet_type, packet_size, now); + + if (is_probing) { + pacing_info.probe_cluster_bytes_sent += packet_size.bytes(); + // If we are currently probing, we need to stop the send loop when we + // have reached the send target. + if (data_sent >= recommended_probe_size) { + break; + } + } + + // Update target send time in case that are more packets that we are late + // in processing. + target_send_time = NextSendTime(); + if (target_send_time > now) { + // Exit loop if not probing. + if (!is_probing) { + break; + } + target_send_time = now; + } + UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_send_time)); + } + } + + if (iteration >= circuit_breaker_threshold_) { + // Circuit break activated. Log warning, adjust send time and return. + // TODO(sprang): Consider completely clearing state. + LOG_ERROR( + "PacingController exceeded max iterations in send-loop. Debug info: " + "packets sent = {}, padding packets generated = {}, bytes sent = {}, " + "probing = {}, recommended_probe_size = {}, now = {}, target_send_time " + "= {}, " + "last_process_time = {}, last_send_time = {}, paused = {}, media_debt " + "= {}, " + "padding_debt = {}, pacing_rate = {}, adjusted_media_rate = {}, " + "padding_rate = {}, " + "queue size (packets) = {}, queue size (payload bytes) = {}", + packets_sent, padding_packets_generated, data_sent.bytes(), + (is_probing ? "true" : "false"), recommended_probe_size.bytes(), + now.us(), target_send_time.us(), last_process_time_.us(), + last_send_time_.us(), (paused_ ? "true" : "false"), media_debt_.bytes(), + padding_debt_.bytes(), pacing_rate_.bps(), adjusted_media_rate_.bps(), + padding_rate_.bps(), packet_queue_.SizeInPackets(), + ToString(packet_queue_.SizeInPayloadBytes())); + last_send_time_ = now; + last_process_time_ = now; + return; + } + + if (is_probing) { + probing_send_failure_ = data_sent == DataSize::Zero(); + if (!probing_send_failure_) { + prober_.ProbeSent(CurrentTime(), data_sent); + } + } + + // Queue length has probably decreased, check if pacing rate needs to updated. + // Poll the time again, since we might have enqueued new fec/padding packets + // with a later timestamp than `now`. + MaybeUpdateMediaRateDueToLongQueue(CurrentTime()); +} + +DataSize PacingController::PaddingToAdd(DataSize recommended_probe_size, + DataSize data_sent) const { + if (!packet_queue_.Empty()) { + // Actual payload available, no need to add padding. + return DataSize::Zero(); + } + + if (congested_) { + // Don't add padding if congested, even if requested for probing. + return DataSize::Zero(); + } + + if (!recommended_probe_size.IsZero()) { + if (recommended_probe_size > data_sent) { + return recommended_probe_size - data_sent; + } + return DataSize::Zero(); + } + + if (padding_rate_ > DataRate::Zero() && padding_debt_ == DataSize::Zero()) { + return kTargetPaddingDuration * padding_rate_; + } + return DataSize::Zero(); +} + +std::unique_ptr PacingController::GetPendingPacket( + const PacedPacketInfo& pacing_info, Timestamp target_send_time, + Timestamp now) { + const bool is_probe = + pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe; + // If first packet in probe, insert a small padding packet so we have a + // more reliable start window for the rate estimation. + if (is_probe && pacing_info.probe_cluster_bytes_sent == 0) { + auto padding = packet_sender_->GeneratePadding(DataSize::Bytes(1)); + // If no RTP modules sending media are registered, we may not get a + // padding packet back. + if (!padding.empty()) { + // We should never get more than one padding packets with a requested + // size of 1 byte. + return std::move(padding[0]); + } + } + + if (packet_queue_.Empty()) { + return nullptr; + } + + // First, check if there is any reason _not_ to send the next queued packet. + // Unpaced packets and probes are exempted from send checks. + if (NextUnpacedSendTime().IsInfinite() && !is_probe) { + if (congested_) { + // Don't send anything if congested. + return nullptr; + } + + if (now <= target_send_time && send_burst_interval_.IsZero()) { + // We allow sending slightly early if we think that we would actually + // had been able to, had we been right on time - i.e. the current debt + // is not more than would be reduced to zero at the target sent time. + // If we allow packets to be sent in a burst, packet are allowed to be + // sent early. + TimeDelta flush_time = media_debt_ / adjusted_media_rate_; + if (now + flush_time > target_send_time) { + return nullptr; + } + } + } + + return packet_queue_.Pop(); +} + +void PacingController::OnPacketSent(RtpPacketMediaType packet_type, + DataSize packet_size, Timestamp send_time) { + if (!first_sent_packet_time_ && packet_type != RtpPacketMediaType::kPadding) { + first_sent_packet_time_ = send_time; + } + + bool audio_packet = packet_type == RtpPacketMediaType::kAudio; + if ((!audio_packet || account_for_audio_) && packet_size > DataSize::Zero()) { + UpdateBudgetWithSentData(packet_size); + } + + last_send_time_ = send_time; +} + +void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) { + media_debt_ -= std::min(media_debt_, adjusted_media_rate_ * delta); + padding_debt_ -= std::min(padding_debt_, padding_rate_ * delta); +} + +void PacingController::UpdateBudgetWithSentData(DataSize size) { + media_debt_ += size; + media_debt_ = std::min(media_debt_, adjusted_media_rate_ * kMaxDebtInTime); + UpdatePaddingBudgetWithSentData(size); +} + +void PacingController::UpdatePaddingBudgetWithSentData(DataSize size) { + padding_debt_ += size; + padding_debt_ = std::min(padding_debt_, padding_rate_ * kMaxDebtInTime); +} + +void PacingController::SetQueueTimeLimit(TimeDelta limit) { + queue_time_limit_ = limit; +} + +void PacingController::MaybeUpdateMediaRateDueToLongQueue(Timestamp now) { + adjusted_media_rate_ = pacing_rate_; + if (!drain_large_queues_) { + return; + } + + DataSize queue_size_data = QueueSizeData(); + if (queue_size_data > DataSize::Zero()) { + // Assuming equal size packets and input/output rate, the average packet + // has avg_time_left_ms left to get queue_size_bytes out of the queue, if + // time constraint shall be met. Determine bitrate needed for that. + packet_queue_.UpdateAverageQueueTime(now); + TimeDelta avg_time_left = + std::max(TimeDelta::Millis(1), + queue_time_limit_ - packet_queue_.AverageQueueTime()); + DataRate min_rate_needed = queue_size_data / avg_time_left; + if (min_rate_needed > pacing_rate_) { + adjusted_media_rate_ = min_rate_needed; + LOG_INFO("bwe:large_pacing_queue pacing_rate_kbps={}", + pacing_rate_.kbps()); + } + } +} + +Timestamp PacingController::NextUnpacedSendTime() const { + if (!pace_audio_) { + Timestamp leading_audio_send_time = + packet_queue_.LeadingPacketEnqueueTime(RtpPacketMediaType::kAudio); + if (leading_audio_send_time.IsFinite()) { + return leading_audio_send_time; + } + } + if (fast_retransmissions_) { + Timestamp leading_retransmission_send_time = + packet_queue_.LeadingPacketEnqueueTimeForRetransmission(); + if (leading_retransmission_send_time.IsFinite()) { + return leading_retransmission_send_time; + } + } + return Timestamp::MinusInfinity(); +} + +} // namespace webrtc diff --git a/src/qos/pacing_controller.h b/src/qos/pacing_controller.h new file mode 100644 index 0000000..7b7304a --- /dev/null +++ b/src/qos/pacing_controller.h @@ -0,0 +1,284 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_PACING_PACING_CONTROLLER_H_ +#define MODULES_PACING_PACING_CONTROLLER_H_ + +#include +#include + +#include +#include +#include +#include + +#include "api/array_view.h" +#include "api/clock/clock.h" +#include "api/transport/network_types.h" +#include "api/units/data_rate.h" +#include "api/units/data_size.h" +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" +#include "bitrate_prober.h" +#include "prioritized_packet_queue.h" +#include "rtp_packet_to_send.h" +#include "rtp_rtcp_defines.h" + +namespace webrtc { + +// This class implements a leaky-bucket packet pacing algorithm. It handles the +// logic of determining which packets to send when, but the actual timing of +// the processing is done externally (e.g. RtpPacketPacer). Furthermore, the +// forwarding of packets when they are ready to be sent is also handled +// externally, via the PacingController::PacketSender interface. +class PacingController { + public: + class PacketSender { + public: + virtual ~PacketSender() = default; + virtual void SendPacket(std::unique_ptr packet, + const PacedPacketInfo& cluster_info) = 0; + // Should be called after each call to SendPacket(). + virtual std::vector> FetchFec() = 0; + virtual std::vector> GeneratePadding( + DataSize size) = 0; + // TODO(bugs.webrtc.org/1439830): Make pure virtual once subclasses adapt. + virtual void OnBatchComplete() {} + + // TODO(bugs.webrtc.org/11340): Make pure virtual once downstream projects + // have been updated. + virtual void OnAbortedRetransmissions( + uint32_t /* ssrc */, + rtc::ArrayView /* sequence_numbers */) {} + virtual std::optional GetRtxSsrcForMedia( + uint32_t /* ssrc */) const { + return std::nullopt; + } + }; + + // If no media or paused, wake up at least every `kPausedProcessIntervalMs` in + // order to send a keep-alive packet so we don't get stuck in a bad state due + // to lack of feedback. + static const TimeDelta kPausedProcessInterval; + // The default minimum time that should elapse calls to `ProcessPackets()`. + static const TimeDelta kMinSleepTime; + // When padding should be generated, add packets to the buffer with a size + // corresponding to this duration times the current padding rate. + static const TimeDelta kTargetPaddingDuration; + // The maximum time that the pacer can use when "replaying" passed time where + // padding should have been generated. + static const TimeDelta kMaxPaddingReplayDuration; + // Allow probes to be processed slightly ahead of inteded send time. Currently + // set to 1ms as this is intended to allow times be rounded down to the + // nearest millisecond. + static const TimeDelta kMaxEarlyProbeProcessing; + // Max total size of packets expected to be sent in a burst in order to not + // risk loosing packets due to too small send socket buffers. It upper limits + // the send burst interval. + // Ex: max send burst interval = 63Kb / 10Mbit/s = 50ms. + static constexpr DataSize kMaxBurstSize = DataSize::Bytes(63 * 1000); + + // Configuration default values. + static constexpr TimeDelta kDefaultBurstInterval = TimeDelta::Millis(40); + static constexpr TimeDelta kMaxExpectedQueueLength = TimeDelta::Millis(2000); + + struct Configuration { + // If the pacer queue grows longer than the configured max queue limit, + // pacer sends at the minimum rate needed to keep the max queue limit and + // ignore the current bandwidth estimate. + bool drain_large_queues = true; + // Expected max pacer delay. If ExpectedQueueTime() is higher than + // this value, the packet producers should wait (eg drop frames rather than + // encoding them). Bitrate sent may temporarily exceed target set by + // SetPacingRates() so that this limit will be upheld if + // `drain_large_queues` is set. + TimeDelta queue_time_limit = kMaxExpectedQueueLength; + // If the first packet of a keyframe is enqueued on a RTP stream, pacer + // skips forward to that packet and drops other enqueued packets on that + // stream, unless a keyframe is already being paced. + bool keyframe_flushing = false; + // Audio retransmission is prioritized before video retransmission packets. + bool prioritize_audio_retransmission = false; + // Configure separate timeouts per priority. After a timeout, a packet of + // that sort will not be paced and instead dropped. + // Note: to set TTL on audio retransmission, + // `prioritize_audio_retransmission` must be true. + PacketQueueTTL packet_queue_ttl; + // The pacer is allowed to send enqueued packets in bursts and can build up + // a packet "debt" that correspond to approximately the send rate during the + // burst interval. + TimeDelta send_burst_interval = kDefaultBurstInterval; + }; + + static Configuration DefaultConfiguration() { return Configuration{}; } + + PacingController(Clock* clock, PacketSender* packet_sender, + Configuration configuration = DefaultConfiguration()); + + ~PacingController(); + + // Adds the packet to the queue and calls PacketRouter::SendPacket() when + // it's time to send. + void EnqueuePacket(std::unique_ptr packet); + + void CreateProbeClusters( + rtc::ArrayView probe_cluster_configs); + + void Pause(); // Temporarily pause all sending. + void Resume(); // Resume sending packets. + bool IsPaused() const; + + void SetCongested(bool congested); + + // Sets the pacing rates. Must be called once before packets can be sent. + void SetPacingRates(DataRate pacing_rate, DataRate padding_rate); + DataRate pacing_rate() const { return adjusted_media_rate_; } + + // Currently audio traffic is not accounted by pacer and passed through. + // With the introduction of audio BWE audio traffic will be accounted for + // the pacer budget calculation. The audio traffic still will be injected + // at high priority. + void SetAccountForAudioPackets(bool account_for_audio); + void SetIncludeOverhead(); + + void SetTransportOverhead(DataSize overhead_per_packet); + // The pacer is allowed to send enqued packets in bursts and can build up a + // packet "debt" that correspond to approximately the send rate during + // 'burst_interval'. + void SetSendBurstInterval(TimeDelta burst_interval); + + // A probe may be sent without first waing for a media packet. + void SetAllowProbeWithoutMediaPacket(bool allow); + + // Returns the time when the oldest packet was queued. + Timestamp OldestPacketEnqueueTime() const; + + // Number of packets in the pacer queue. + size_t QueueSizePackets() const; + // Number of packets in the pacer queue per media type (RtpPacketMediaType + // values are used as lookup index). + const std::array& SizeInPacketsPerRtpPacketMediaType() + const; + // Totals size of packets in the pacer queue. + DataSize QueueSizeData() const; + + // Current buffer level, i.e. max of media and padding debt. + DataSize CurrentBufferLevel() const; + + // Returns the time when the first packet was sent. + std::optional FirstSentPacketTime() const; + + // Returns the number of milliseconds it will take to send the current + // packets in the queue, given the current size and bitrate, ignoring prio. + TimeDelta ExpectedQueueTime() const; + + void SetQueueTimeLimit(TimeDelta limit); + + // Enable bitrate probing. Enabled by default, mostly here to simplify + // testing. Must be called before any packets are being sent to have an + // effect. + void SetProbingEnabled(bool enabled); + + // Returns the next time we expect ProcessPackets() to be called. + Timestamp NextSendTime() const; + + // Check queue of pending packets and send them or padding packets, if budget + // is available. + void ProcessPackets(); + + bool IsProbing() const; + + // Note: Intended for debugging purposes only, will be removed. + // Sets the number of iterations of the main loop in `ProcessPackets()` that + // is considered erroneous to exceed. + void SetCircuitBreakerThreshold(int num_iterations); + + // Remove any pending packets matching this SSRC from the packet queue. + void RemovePacketsForSsrc(uint32_t ssrc); + + private: + TimeDelta UpdateTimeAndGetElapsed(Timestamp now); + bool ShouldSendKeepalive(Timestamp now) const; + + // Updates the number of bytes that can be sent for the next time interval. + void UpdateBudgetWithElapsedTime(TimeDelta delta); + void UpdateBudgetWithSentData(DataSize size); + void UpdatePaddingBudgetWithSentData(DataSize size); + + DataSize PaddingToAdd(DataSize recommended_probe_size, + DataSize data_sent) const; + + std::unique_ptr GetPendingPacket( + const PacedPacketInfo& pacing_info, Timestamp target_send_time, + Timestamp now); + void OnPacketSent(RtpPacketMediaType packet_type, DataSize packet_size, + Timestamp send_time); + void MaybeUpdateMediaRateDueToLongQueue(Timestamp now); + + Timestamp CurrentTime() const; + + // Helper methods for packet that may not be paced. Returns a finite Timestamp + // if a packet type is configured to not be paced and the packet queue has at + // least one packet of that type. Otherwise returns + // Timestamp::MinusInfinity(). + Timestamp NextUnpacedSendTime() const; + + Clock* const clock_; + PacketSender* const packet_sender_; + + const bool drain_large_queues_; + const bool send_padding_if_silent_; + const bool pace_audio_; + const bool ignore_transport_overhead_; + const bool fast_retransmissions_; + const bool keyframe_flushing_; + DataRate max_rate = DataRate::BitsPerSec(100'000'000); + DataSize transport_overhead_per_packet_; + TimeDelta send_burst_interval_; + + // TODO(webrtc:9716): Remove this when we are certain clocks are monotonic. + // The last millisecond timestamp returned by `clock_`. + mutable Timestamp last_timestamp_; + bool paused_; + + // Amount of outstanding data for media and padding. + DataSize media_debt_; + DataSize padding_debt_; + + // The target pacing rate, signaled via SetPacingRates(). + DataRate pacing_rate_; + // The media send rate, which might adjusted from pacing_rate_, e.g. if the + // pacing queue is growing too long. + DataRate adjusted_media_rate_; + // The padding target rate. We aim to fill up to this rate with padding what + // is not already used by media. + DataRate padding_rate_; + + BitrateProber prober_; + bool probing_send_failure_; + + Timestamp last_process_time_; + Timestamp last_send_time_; + std::optional first_sent_packet_time_; + bool seen_first_packet_; + + PrioritizedPacketQueue packet_queue_; + + bool congested_; + + TimeDelta queue_time_limit_; + bool account_for_audio_; + bool include_overhead_; + + int circuit_breaker_threshold_; +}; +} // namespace webrtc + +#endif // MODULES_PACING_PACING_CONTROLLER_H_ diff --git a/src/qos/prioritized_packet_queue.cc b/src/qos/prioritized_packet_queue.cc new file mode 100644 index 0000000..18b2cfa --- /dev/null +++ b/src/qos/prioritized_packet_queue.cc @@ -0,0 +1,443 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "prioritized_packet_queue.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "api/units/data_size.h" +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" +#include "inlined_vector.h" +#include "log.h" +#include "rtp_packet_to_send.h" +#include "rtp_rtcp_defines.h" + +namespace webrtc { +namespace { + +constexpr int kAudioPrioLevel = 0; + +int GetPriorityForType( + RtpPacketMediaType type, + std::optional original_type) { + // Lower number takes priority over higher. + switch (type) { + case RtpPacketMediaType::kAudio: + // Audio is always prioritized over other packet types. + return kAudioPrioLevel; + case RtpPacketMediaType::kRetransmission: + // Send retransmissions before new media. If original_type is set, audio + // retransmission is prioritized more than video retransmission. + if (original_type == RtpPacketToSend::OriginalType::kVideo) { + return kAudioPrioLevel + 2; + } + return kAudioPrioLevel + 1; + case RtpPacketMediaType::kVideo: + case RtpPacketMediaType::kForwardErrorCorrection: + // Video has "normal" priority, in the old speak. + // Send redundancy concurrently to video. If it is delayed it might have a + // lower chance of being useful. + return kAudioPrioLevel + 3; + case RtpPacketMediaType::kPadding: + // Packets that are in themselves likely useless, only sent to keep the + // BWE high. + return kAudioPrioLevel + 4; + } + return -1; +} + +} // namespace + +InlinedVector +PrioritizedPacketQueue::ToTtlPerPrio(PacketQueueTTL packet_queue_ttl) { + InlinedVector + ttl_per_prio; + ttl_per_prio.assign(kNumPriorityLevels, TimeDelta::PlusInfinity()); + ttl_per_prio[GetPriorityForType(RtpPacketMediaType::kRetransmission, + RtpPacketToSend::OriginalType::kAudio)] = + packet_queue_ttl.audio_retransmission; + ttl_per_prio[GetPriorityForType(RtpPacketMediaType::kRetransmission, + RtpPacketToSend::OriginalType::kVideo)] = + packet_queue_ttl.video_retransmission; + ttl_per_prio[GetPriorityForType(RtpPacketMediaType::kVideo, std::nullopt)] = + packet_queue_ttl.video; + return ttl_per_prio; +} + +DataSize PrioritizedPacketQueue::QueuedPacket::PacketSize() const { + return DataSize::Bytes(packet->payload_size() + packet->padding_size()); +} + +PrioritizedPacketQueue::StreamQueue::StreamQueue(Timestamp creation_time) + : last_enqueue_time_(creation_time), num_keyframe_packets_(0) {} + +bool PrioritizedPacketQueue::StreamQueue::EnqueuePacket(QueuedPacket packet, + int priority_level) { + if (packet.packet->is_key_frame()) { + ++num_keyframe_packets_; + } + bool first_packet_at_level = packets_[priority_level].empty(); + packets_[priority_level].push_back(std::move(packet)); + return first_packet_at_level; +} + +PrioritizedPacketQueue::QueuedPacket +PrioritizedPacketQueue::StreamQueue::DequeuePacket(int priority_level) { + QueuedPacket packet = std::move(packets_[priority_level].front()); + packets_[priority_level].pop_front(); + if (packet.packet->is_key_frame()) { + --num_keyframe_packets_; + } + return packet; +} + +bool PrioritizedPacketQueue::StreamQueue::HasPacketsAtPrio( + int priority_level) const { + return !packets_[priority_level].empty(); +} + +bool PrioritizedPacketQueue::StreamQueue::IsEmpty() const { + for (const std::deque& queue : packets_) { + if (!queue.empty()) { + return false; + } + } + return true; +} + +Timestamp PrioritizedPacketQueue::StreamQueue::LeadingPacketEnqueueTime( + int priority_level) const { + return packets_[priority_level].begin()->enqueue_time; +} + +Timestamp PrioritizedPacketQueue::StreamQueue::LastEnqueueTime() const { + return last_enqueue_time_; +} + +std::array, + PrioritizedPacketQueue::kNumPriorityLevels> +PrioritizedPacketQueue::StreamQueue::DequeueAll() { + std::array, kNumPriorityLevels> packets_by_prio; + for (int i = 0; i < kNumPriorityLevels; ++i) { + packets_by_prio[i].swap(packets_[i]); + } + num_keyframe_packets_ = 0; + return packets_by_prio; +} + +PrioritizedPacketQueue::PrioritizedPacketQueue( + Timestamp creation_time, bool prioritize_audio_retransmission, + PacketQueueTTL packet_queue_ttl) + : prioritize_audio_retransmission_(prioritize_audio_retransmission), + time_to_live_per_prio_(ToTtlPerPrio(packet_queue_ttl)), + queue_time_sum_(TimeDelta::Zero()), + pause_time_sum_(TimeDelta::Zero()), + size_packets_(0), + size_packets_per_media_type_({}), + size_payload_(DataSize::Zero()), + last_update_time_(creation_time), + paused_(false), + last_culling_time_(creation_time), + top_active_prio_level_(-1) {} + +void PrioritizedPacketQueue::Push(Timestamp enqueue_time, + std::unique_ptr packet) { + StreamQueue* stream_queue; + auto [it, inserted] = streams_.emplace(packet->Ssrc(), nullptr); + if (inserted) { + it->second = std::make_unique(enqueue_time); + } + stream_queue = it->second.get(); + + auto enqueue_time_iterator = + enqueue_times_.insert(enqueue_times_.end(), enqueue_time); + RtpPacketMediaType packet_type = packet->packet_type().value(); + int prio_level = + GetPriorityForType(packet_type, prioritize_audio_retransmission_ + ? packet->original_packet_type() + : std::nullopt); + PurgeOldPacketsAtPriorityLevel(prio_level, enqueue_time); + QueuedPacket queued_packed = {std::move(packet), enqueue_time, + enqueue_time_iterator}; + // In order to figure out how much time a packet has spent in the queue + // while not in a paused state, we subtract the total amount of time the + // queue has been paused so far, and when the packet is popped we subtract + // the total amount of time the queue has been paused at that moment. This + // way we subtract the total amount of time the packet has spent in the + // queue while in a paused state. + UpdateAverageQueueTime(enqueue_time); + queued_packed.enqueue_time -= pause_time_sum_; + ++size_packets_; + ++size_packets_per_media_type_[static_cast(packet_type)]; + size_payload_ += queued_packed.PacketSize(); + + if (stream_queue->EnqueuePacket(std::move(queued_packed), prio_level)) { + // Number packets at `prio_level` for this steam is now non-zero. + streams_by_prio_[prio_level].push_back(stream_queue); + } + if (top_active_prio_level_ < 0 || prio_level < top_active_prio_level_) { + top_active_prio_level_ = prio_level; + } + + static constexpr TimeDelta kTimeout = TimeDelta::Millis(500); + if (enqueue_time - last_culling_time_ > kTimeout) { + for (auto it = streams_.begin(); it != streams_.end();) { + if (it->second->IsEmpty() && + it->second->LastEnqueueTime() + kTimeout < enqueue_time) { + streams_.erase(it++); + } else { + ++it; + } + } + last_culling_time_ = enqueue_time; + } +} + +std::unique_ptr PrioritizedPacketQueue::Pop() { + if (size_packets_ == 0) { + return nullptr; + } + + StreamQueue& stream_queue = *streams_by_prio_[top_active_prio_level_].front(); + QueuedPacket packet = stream_queue.DequeuePacket(top_active_prio_level_); + DequeuePacketInternal(packet); + + // Remove StreamQueue from head of fifo-queue for this prio level, and + // and add it to the end if it still has packets. + streams_by_prio_[top_active_prio_level_].pop_front(); + if (stream_queue.HasPacketsAtPrio(top_active_prio_level_)) { + streams_by_prio_[top_active_prio_level_].push_back(&stream_queue); + } else { + MaybeUpdateTopPrioLevel(); + } + + return std::move(packet.packet); +} + +int PrioritizedPacketQueue::SizeInPackets() const { return size_packets_; } + +DataSize PrioritizedPacketQueue::SizeInPayloadBytes() const { + return size_payload_; +} + +bool PrioritizedPacketQueue::Empty() const { return size_packets_ == 0; } + +const std::array& +PrioritizedPacketQueue::SizeInPacketsPerRtpPacketMediaType() const { + return size_packets_per_media_type_; +} + +Timestamp PrioritizedPacketQueue::LeadingPacketEnqueueTime( + RtpPacketMediaType type) const { + const int priority_level = GetPriorityForType(type, std::nullopt); + if (streams_by_prio_[priority_level].empty()) { + return Timestamp::MinusInfinity(); + } + return streams_by_prio_[priority_level].front()->LeadingPacketEnqueueTime( + priority_level); +} + +Timestamp PrioritizedPacketQueue::LeadingPacketEnqueueTimeForRetransmission() + const { + if (!prioritize_audio_retransmission_) { + const int priority_level = + GetPriorityForType(RtpPacketMediaType::kRetransmission, std::nullopt); + if (streams_by_prio_[priority_level].empty()) { + return Timestamp::PlusInfinity(); + } + return streams_by_prio_[priority_level].front()->LeadingPacketEnqueueTime( + priority_level); + } + const int audio_priority_level = + GetPriorityForType(RtpPacketMediaType::kRetransmission, + RtpPacketToSend::OriginalType::kAudio); + const int video_priority_level = + GetPriorityForType(RtpPacketMediaType::kRetransmission, + RtpPacketToSend::OriginalType::kVideo); + + Timestamp next_audio = + streams_by_prio_[audio_priority_level].empty() + ? Timestamp::PlusInfinity() + : streams_by_prio_[audio_priority_level] + .front() + ->LeadingPacketEnqueueTime(audio_priority_level); + Timestamp next_video = + streams_by_prio_[video_priority_level].empty() + ? Timestamp::PlusInfinity() + : streams_by_prio_[video_priority_level] + .front() + ->LeadingPacketEnqueueTime(video_priority_level); + return std::min(next_audio, next_video); +} + +Timestamp PrioritizedPacketQueue::OldestEnqueueTime() const { + return enqueue_times_.empty() ? Timestamp::MinusInfinity() + : enqueue_times_.front(); +} + +TimeDelta PrioritizedPacketQueue::AverageQueueTime() const { + if (size_packets_ == 0) { + return TimeDelta::Zero(); + } + return queue_time_sum_ / size_packets_; +} + +void PrioritizedPacketQueue::UpdateAverageQueueTime(Timestamp now) { + if (now == last_update_time_) { + return; + } + + TimeDelta delta = now - last_update_time_; + + if (paused_) { + pause_time_sum_ += delta; + } else { + queue_time_sum_ += delta * size_packets_; + } + + last_update_time_ = now; +} + +void PrioritizedPacketQueue::SetPauseState(bool paused, Timestamp now) { + UpdateAverageQueueTime(now); + paused_ = paused; +} + +void PrioritizedPacketQueue::RemovePacketsForSsrc(uint32_t ssrc) { + auto kv = streams_.find(ssrc); + if (kv != streams_.end()) { + // Dequeue all packets from the queue for this SSRC. + StreamQueue& queue = *kv->second; + std::array, kNumPriorityLevels> packets_by_prio = + queue.DequeueAll(); + for (int i = 0; i < kNumPriorityLevels; ++i) { + std::deque& packet_queue = packets_by_prio[i]; + if (packet_queue.empty()) { + continue; + } + + // First erase all packets at this prio level. + while (!packet_queue.empty()) { + QueuedPacket packet = std::move(packet_queue.front()); + packet_queue.pop_front(); + DequeuePacketInternal(packet); + } + + // Next, deregister this `StreamQueue` from the round-robin tables. + if (streams_by_prio_[i].size() == 1) { + // This is the last and only queue that had packets for this prio level. + // Update the global top prio level if neccessary. + streams_by_prio_[i].pop_front(); + } else { + // More than stream had packets at this prio level, filter this one out. + std::deque filtered_queue; + for (StreamQueue* queue_ptr : streams_by_prio_[i]) { + if (queue_ptr != &queue) { + filtered_queue.push_back(queue_ptr); + } + } + streams_by_prio_[i].swap(filtered_queue); + } + } + } + MaybeUpdateTopPrioLevel(); +} + +bool PrioritizedPacketQueue::HasKeyframePackets(uint32_t ssrc) const { + auto it = streams_.find(ssrc); + if (it != streams_.end()) { + return it->second->has_keyframe_packets(); + } + return false; +} + +void PrioritizedPacketQueue::DequeuePacketInternal(QueuedPacket& packet) { + --size_packets_; + RtpPacketMediaType packet_type = packet.packet->packet_type().value(); + --size_packets_per_media_type_[static_cast(packet_type)]; + size_payload_ -= packet.PacketSize(); + + // Calculate the total amount of time spent by this packet in the queue + // while in a non-paused state. Note that the `pause_time_sum_ms_` was + // subtracted from `packet.enqueue_time_ms` when the packet was pushed, and + // by subtracting it now we effectively remove the time spent in in the + // queue while in a paused state. + TimeDelta time_in_non_paused_state = + last_update_time_ - packet.enqueue_time - pause_time_sum_; + queue_time_sum_ -= time_in_non_paused_state; + + // Set the time spent in the send queue, which is the per-packet equivalent of + // totalPacketSendDelay. The notion of being paused is an implementation + // detail that we do not want to expose, so it makes sense to report the + // metric excluding the pause time. This also avoids spikes in the metric. + // https://w3c.github.io/webrtc-stats/#dom-rtcoutboundrtpstreamstats-totalpacketsenddelay + packet.packet->set_time_in_send_queue(time_in_non_paused_state); + + enqueue_times_.erase(packet.enqueue_time_iterator); +} + +void PrioritizedPacketQueue::MaybeUpdateTopPrioLevel() { + if (top_active_prio_level_ != -1 && + !streams_by_prio_[top_active_prio_level_].empty()) { + return; + } + // No stream queues have packets at top_active_prio_level_, find top priority + // that is not empty. + for (int i = 0; i < kNumPriorityLevels; ++i) { + PurgeOldPacketsAtPriorityLevel(i, last_update_time_); + if (!streams_by_prio_[i].empty()) { + top_active_prio_level_ = i; + break; + } + } + if (size_packets_ == 0) { + // There are no packets left to send. Last packet may have been purged. Prio + // will change when a new packet is pushed. + top_active_prio_level_ = -1; + } +} + +void PrioritizedPacketQueue::PurgeOldPacketsAtPriorityLevel(int prio_level, + Timestamp now) { + TimeDelta time_to_live = time_to_live_per_prio_[prio_level]; + if (time_to_live.IsInfinite()) { + return; + } + + std::deque& queues = streams_by_prio_[prio_level]; + auto iter = queues.begin(); + while (iter != queues.end()) { + StreamQueue* queue_ptr = *iter; + while (queue_ptr->HasPacketsAtPrio(prio_level) && + (now - queue_ptr->LeadingPacketEnqueueTime(prio_level)) > + time_to_live) { + QueuedPacket packet = queue_ptr->DequeuePacket(prio_level); + LOG_INFO("Dropping old packet on SSRC: %u seq: %u time in queue: %lld ms", + packet.packet->Ssrc(), packet.packet->SequenceNumber(), + (now - packet.enqueue_time).ms()); + DequeuePacketInternal(packet); + } + if (!queue_ptr->HasPacketsAtPrio(prio_level)) { + iter = queues.erase(iter); + } else { + ++iter; + } + } +} + +} // namespace webrtc diff --git a/src/qos/prioritized_packet_queue.h b/src/qos/prioritized_packet_queue.h new file mode 100644 index 0000000..e2191a7 --- /dev/null +++ b/src/qos/prioritized_packet_queue.h @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_PACING_PRIORITIZED_PACKET_QUEUE_H_ +#define MODULES_PACING_PRIORITIZED_PACKET_QUEUE_H_ + +#include + +#include +#include +#include +#include +#include +#include + +#include "api/units/data_size.h" +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" +#include "inlined_vector.h" +#include "rtp_packet_to_send.h" +#include "rtp_rtcp_defines.h" + +namespace webrtc { + +// Describes how long time a packet may stay in the queue before being dropped. +struct PacketQueueTTL { + TimeDelta audio_retransmission = TimeDelta::PlusInfinity(); + TimeDelta video_retransmission = TimeDelta::PlusInfinity(); + TimeDelta video = TimeDelta::PlusInfinity(); +}; + +class PrioritizedPacketQueue { + public: + explicit PrioritizedPacketQueue( + Timestamp creation_time, bool prioritize_audio_retransmission = false, + PacketQueueTTL packet_queue_ttl = PacketQueueTTL()); + PrioritizedPacketQueue(const PrioritizedPacketQueue&) = delete; + PrioritizedPacketQueue& operator=(const PrioritizedPacketQueue&) = delete; + + // Add a packet to the queue. The enqueue time is used for queue time stats + // and to report the leading packet enqueue time per packet type. + void Push(Timestamp enqueue_time, std::unique_ptr packet); + + // Remove the next packet from the queue. Packets a prioritized first + // according to packet type, in the following order: + // - audio, retransmissions, video / fec, padding + // For each packet type, we use one FIFO-queue per SSRC and emit from + // those queues in a round-robin fashion. + std::unique_ptr Pop(); + + // Number of packets in the queue. + int SizeInPackets() const; + + // Sum of all payload bytes in the queue, where the payload is calculated + // as `packet->payload_size() + packet->padding_size()`. + DataSize SizeInPayloadBytes() const; + + // Convenience method for `SizeInPackets() == 0`. + bool Empty() const; + + // Total packets in the queue per media type (RtpPacketMediaType values are + // used as lookup index). + const std::array& SizeInPacketsPerRtpPacketMediaType() + const; + + // The enqueue time of the next packet this queue will return via the Pop() + // method, for the given packet type. If queue has no packets, of that type, + // returns Timestamp::MinusInfinity(). + Timestamp LeadingPacketEnqueueTime(RtpPacketMediaType type) const; + Timestamp LeadingPacketEnqueueTimeForRetransmission() const; + + // Enqueue time of the oldest packet in the queue, + // Timestamp::MinusInfinity() if queue is empty. + Timestamp OldestEnqueueTime() const; + + // Average queue time for the packets currently in the queue. + // The queuing time is calculated from Push() to the last UpdateQueueTime() + // call - with any time spent in a paused state subtracted. + // Returns TimeDelta::Zero() for an empty queue. + TimeDelta AverageQueueTime() const; + + // Called during packet processing or when pause stats changes. Since the + // AverageQueueTime() method does not look at the wall time, this method + // needs to be called before querying queue time. + void UpdateAverageQueueTime(Timestamp now); + + // Set the pause state, while `paused` is true queuing time is not counted. + void SetPauseState(bool paused, Timestamp now); + + // Remove any packets matching the given SSRC. + void RemovePacketsForSsrc(uint32_t ssrc); + + // Checks if the queue for the given SSRC has original (retransmissions not + // counted) video packets containing keyframe data. + bool HasKeyframePackets(uint32_t ssrc) const; + + private: + static constexpr int kNumPriorityLevels = 5; + + class QueuedPacket { + public: + DataSize PacketSize() const; + + std::unique_ptr packet; + Timestamp enqueue_time; + std::list::iterator enqueue_time_iterator; + }; + + // Class containing packets for an RTP stream. + // For each priority level, packets are simply stored in a fifo queue. + class StreamQueue { + public: + explicit StreamQueue(Timestamp creation_time); + StreamQueue(StreamQueue&&) = default; + StreamQueue& operator=(StreamQueue&&) = default; + + StreamQueue(const StreamQueue&) = delete; + StreamQueue& operator=(const StreamQueue&) = delete; + + // Enqueue packet at the given priority level. Returns true if the packet + // count for that priority level went from zero to non-zero. + bool EnqueuePacket(QueuedPacket packet, int priority_level); + + QueuedPacket DequeuePacket(int priority_level); + + bool HasPacketsAtPrio(int priority_level) const; + bool IsEmpty() const; + Timestamp LeadingPacketEnqueueTime(int priority_level) const; + Timestamp LastEnqueueTime() const; + bool has_keyframe_packets() const { return num_keyframe_packets_ > 0; } + + std::array, kNumPriorityLevels> DequeueAll(); + + private: + std::deque packets_[kNumPriorityLevels]; + Timestamp last_enqueue_time_; + int num_keyframe_packets_; + }; + + // Remove the packet from the internal state, e.g. queue time / size etc. + void DequeuePacketInternal(QueuedPacket& packet); + + // Check if the queue pointed to by `top_active_prio_level_` is empty and + // if so move it to the lowest non-empty index. + void MaybeUpdateTopPrioLevel(); + + void PurgeOldPacketsAtPriorityLevel(int prio_level, Timestamp now); + + static InlinedVector ToTtlPerPrio( + PacketQueueTTL); + + const bool prioritize_audio_retransmission_; + const InlinedVector time_to_live_per_prio_; + + // Cumulative sum, over all packets, of time spent in the queue. + TimeDelta queue_time_sum_; + // Cumulative sum of time the queue has spent in a paused state. + TimeDelta pause_time_sum_; + // Total number of packets stored in this queue. + int size_packets_; + // Total number of packets stored in this queue per RtpPacketMediaType. + std::array size_packets_per_media_type_; + // Sum of payload sizes for all packts stored in this queue. + DataSize size_payload_; + // The last time queue/pause time sums were updated. + Timestamp last_update_time_; + bool paused_; + + // Last time `streams_` was culled for inactive streams. + Timestamp last_culling_time_; + + // Map from SSRC to packet queues for the associated RTP stream. + std::unordered_map> streams_; + + // For each priority level, a queue of StreamQueues which have at least one + // packet pending for that prio level. + std::deque streams_by_prio_[kNumPriorityLevels]; + + // The first index into `stream_by_prio_` that is non-empty. + int top_active_prio_level_; + + // Ordered list of enqueue times. Additions are always increasing and added to + // the end. QueuedPacket instances have a iterators into this list for fast + // removal. + std::list enqueue_times_; +}; + +} // namespace webrtc + +#endif // MODULES_PACING_PRIORITIZED_PACKET_QUEUE_H_ diff --git a/src/qos/rtp_packet_pacer.h b/src/qos/rtp_packet_pacer.h new file mode 100644 index 0000000..5f12048 --- /dev/null +++ b/src/qos/rtp_packet_pacer.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MODULES_PACING_RTP_PACKET_PACER_H_ +#define MODULES_PACING_RTP_PACKET_PACER_H_ + +#include +#include + +#include "api/transport/network_types.h" +#include "api/units/data_rate.h" +#include "api/units/data_size.h" +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" + +namespace webrtc { + +class RtpPacketPacer { + public: + virtual ~RtpPacketPacer() = default; + + virtual void CreateProbeClusters( + std::vector probe_cluster_configs) = 0; + + // Temporarily pause all sending. + virtual void Pause() = 0; + + // Resume sending packets. + virtual void Resume() = 0; + + virtual void SetCongested(bool congested) = 0; + + // Sets the pacing rates. Must be called once before packets can be sent. + virtual void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) = 0; + + // Time since the oldest packet currently in the queue was added. + virtual TimeDelta OldestPacketWaitTime() const = 0; + + // Sum of payload + padding bytes of all packets currently in the pacer queue. + virtual DataSize QueueSizeData() const = 0; + + // Returns the time when the first packet was sent. + virtual std::optional FirstSentPacketTime() const = 0; + + // Returns the expected number of milliseconds it will take to send the + // current packets in the queue, given the current size and bitrate, ignoring + // priority. + virtual TimeDelta ExpectedQueueTime() const = 0; + + // Set the average upper bound on pacer queuing delay. The pacer may send at + // a higher rate than what was configured via SetPacingRates() in order to + // keep ExpectedQueueTimeMs() below `limit_ms` on average. + virtual void SetQueueTimeLimit(TimeDelta limit) = 0; + + // Currently audio traffic is not accounted by pacer and passed through. + // With the introduction of audio BWE audio traffic will be accounted for + // the pacer budget calculation. The audio traffic still will be injected + // at high priority. + virtual void SetAccountForAudioPackets(bool account_for_audio) = 0; + virtual void SetIncludeOverhead() = 0; + virtual void SetTransportOverhead(DataSize overhead_per_packet) = 0; +}; + +} // namespace webrtc +#endif // MODULES_PACING_RTP_PACKET_PACER_H_ diff --git a/src/rtp/rtp_statistics.cpp b/src/rtp/rtp_statistics/rtp_statistics.cpp similarity index 100% rename from src/rtp/rtp_statistics.cpp rename to src/rtp/rtp_statistics/rtp_statistics.cpp diff --git a/src/rtp/rtp_statistics.h b/src/rtp/rtp_statistics/rtp_statistics.h similarity index 100% rename from src/rtp/rtp_statistics.h rename to src/rtp/rtp_statistics/rtp_statistics.h diff --git a/src/transport/ice_transport_controller.cpp b/src/transport/ice_transport_controller.cpp index 830aaeb..7e97b6a 100644 --- a/src/transport/ice_transport_controller.cpp +++ b/src/transport/ice_transport_controller.cpp @@ -52,6 +52,7 @@ void IceTransportController::Create( CreateAudioCodec(); controller_ = std::make_unique(); + packet_sender_ = std::make_unique(ice_agent, webrtc_clock_); resolution_adapter_ = std::make_unique(); video_channel_send_ = std::make_unique( @@ -471,7 +472,6 @@ void IceTransportController::PostUpdates(webrtc::NetworkControlUpdate update) { target_bitrate_ = target_bitrate; int width, height, target_width, target_height; video_encoder_->GetResolution(&width, &height); - if (0 == resolution_adapter_->GetResolution(target_bitrate_, width, height, &target_width, &target_height)) { @@ -480,19 +480,20 @@ void IceTransportController::PostUpdates(webrtc::NetworkControlUpdate update) { target_height_ = target_height; b_force_i_frame_ = true; - // LOG_INFO("Set target resolution [{}x{}]", target_width_.value(), - // target_height_.value()); } } else if (target_width_.has_value() && target_height_.has_value()) { target_width_.reset(); target_height_.reset(); - // LOG_INFO("Use original resolution [{}x{}]", source_width_, - // source_height_); } video_encoder_->SetTargetBitrate(target_bitrate_); LOG_WARN("Set target bitrate [{}]bps", target_bitrate_); } } + + if (!update.probe_cluster_configs.empty()) { + packet_sender_->CreateProbeClusters( + std::move(update.probe_cluster_configs)); + } } void IceTransportController::UpdateControlState() { diff --git a/src/transport/ice_transport_controller.h b/src/transport/ice_transport_controller.h index 871be83..a8b2290 100644 --- a/src/transport/ice_transport_controller.h +++ b/src/transport/ice_transport_controller.h @@ -14,12 +14,14 @@ #include "audio_channel_send.h" #include "audio_decoder.h" #include "audio_encoder.h" +#include "bitrate_prober.h" #include "clock/system_clock.h" #include "congestion_control.h" #include "congestion_control_feedback.h" #include "data_channel_receive.h" #include "data_channel_send.h" #include "ice_agent.h" +#include "packet_sender.h" #include "resolution_adapter.h" #include "transport_feedback_adapter.h" #include "video_channel_receive.h" @@ -103,7 +105,7 @@ class IceTransportController std::shared_ptr ice_agent_ = nullptr; std::shared_ptr ice_io_statistics_ = nullptr; std::unique_ptr rtp_packetizer_ = nullptr; - std::unique_ptr rtp_video_sender_ = nullptr; + std::unique_ptr packet_sender_ = nullptr; std::string remote_user_id_; void *user_data_ = nullptr; @@ -112,6 +114,7 @@ class IceTransportController std::shared_ptr webrtc_clock_ = nullptr; webrtc::TransportFeedbackAdapter transport_feedback_adapter_; std::unique_ptr controller_; + BitrateProber prober_; private: std::unique_ptr video_encoder_ = nullptr; diff --git a/src/transport/packet_sender.cpp b/src/transport/packet_sender.cpp new file mode 100644 index 0000000..01d767c --- /dev/null +++ b/src/transport/packet_sender.cpp @@ -0,0 +1,118 @@ + +#include "packet_sender.h" + +#include "log.h" + +PacketSender::PacketSender(std::shared_ptr ice_agent, + std::shared_ptr clock) + : ice_agent_(ice_agent), + clock_(clock), + pacing_controller_(clock.get(), this) {} + +PacketSender::~PacketSender() {} + +// int PacketSender::SendPacket(const char *data, size_t size) { +// LOG_INFO("Send packet, size: %d", size); +// return ice_agent_->Send(data, size); +// } + +// void PacketSender::CreateProbeClusters( +// std::vector probe_cluster_configs) { +// pacing_controller_.CreateProbeClusters(probe_cluster_configs); +// MaybeScheduleProcessPackets(); +// } + +// void PacketSender::MaybeScheduleProcessPackets() { +// if (!processing_packets_) +// MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); +// } + +// void PacketSender::MaybeProcessPackets( +// webrtc::Timestamp scheduled_process_time) { +// if (is_shutdown_ || !is_started_) { +// return; +// } + +// // Protects against re-entry from transport feedback calling into the task +// // queue pacer. +// processing_packets_ = true; +// auto cleanup = std::unique_ptr>( +// nullptr, [this](void *) { processing_packets_ = false; }); + +// webrtc::Timestamp next_send_time = pacing_controller_.NextSendTime(); +// const webrtc::Timestamp now = clock_->CurrentTime(); +// webrtc::TimeDelta early_execute_margin = +// pacing_controller_.IsProbing() +// ? webrtc::PacingController::kMaxEarlyProbeProcessing +// : webrtc::TimeDelta::Zero(); + +// // Process packets and update stats. +// while (next_send_time <= now + early_execute_margin) { +// pacing_controller_.ProcessPackets(); +// next_send_time = pacing_controller_.NextSendTime(); + +// // Probing state could change. Get margin after process packets. +// early_execute_margin = +// pacing_controller_.IsProbing() +// ? webrtc::PacingController::kMaxEarlyProbeProcessing +// : webrtc::TimeDelta::Zero(); +// } +// UpdateStats(); + +// // Ignore retired scheduled task, otherwise reset `next_process_time_`. +// if (scheduled_process_time.IsFinite()) { +// if (scheduled_process_time != next_process_time_) { +// return; +// } +// next_process_time_ = webrtc::Timestamp::MinusInfinity(); +// } + +// // Do not hold back in probing. +// webrtc::TimeDelta hold_back_window = webrtc::TimeDelta::Zero(); +// if (!pacing_controller_.IsProbing()) { +// hold_back_window = max_hold_back_window_; +// webrtc::DataRate pacing_rate = pacing_controller_.pacing_rate(); +// if (max_hold_back_window_in_packets_ != kNoPacketHoldback && +// !pacing_rate.IsZero() && +// packet_size_.filtered() != rtc::ExpFilter::kValueUndefined) { +// webrtc::TimeDelta avg_packet_send_time = +// webrtc::DataSize::Bytes(packet_size_.filtered()) / pacing_rate; +// hold_back_window = +// std::min(hold_back_window, +// avg_packet_send_time * max_hold_back_window_in_packets_); +// } +// } + +// // Calculate next process time. +// webrtc::TimeDelta time_to_next_process = +// std::max(hold_back_window, next_send_time - now - +// early_execute_margin); +// next_send_time = now + time_to_next_process; + +// // If no in flight task or in flight task is later than `next_send_time`, +// // schedule a new one. Previous in flight task will be retired. +// if (next_process_time_.IsMinusInfinity() || +// next_process_time_ > next_send_time) { +// // Prefer low precision if allowed and not probing. +// task_queue_->PostDelayedHighPrecisionTask( +// SafeTask( +// safety_.flag(), +// [this, next_send_time]() { MaybeProcessPackets(next_send_time); +// }), +// time_to_next_process.RoundUpTo(webrtc::TimeDelta::Millis(1))); +// next_process_time_ = next_send_time; +// } +// } + +// void PacketSender::UpdateStats() { +// Stats new_stats; +// new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime(); +// new_stats.first_sent_packet_time = +// pacing_controller_.FirstSentPacketTime(); +// new_stats.oldest_packet_enqueue_time = +// pacing_controller_.OldestPacketEnqueueTime(); +// new_stats.queue_size = pacing_controller_.QueueSizeData(); +// OnStatsUpdated(new_stats); +// } + +// PacketSender::Stats PacketSender::GetStats() const { return current_stats_; } \ No newline at end of file diff --git a/src/transport/packet_sender.h b/src/transport/packet_sender.h new file mode 100644 index 0000000..1291357 --- /dev/null +++ b/src/transport/packet_sender.h @@ -0,0 +1,116 @@ +/* + * @Author: DI JUNKUN + * @Date: 2025-03-12 + * Copyright (c) 2025 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _PACKET_SENDER_H_ +#define _PACKET_SENDER_H_ + +#include + +#include "api/array_view.h" +#include "api/transport/network_types.h" +#include "api/units/data_rate.h" +#include "api/units/data_size.h" +#include "api/units/time_delta.h" +#include "api/units/timestamp.h" +#include "ice_agent.h" +#include "pacing_controller.h" +#include "rtc_base/numerics/exp_filter.h" +#include "rtp_packet_pacer.h" +#include "rtp_packet_to_send.h" + +class PacketSender : public webrtc::RtpPacketPacer, + public webrtc::PacingController::PacketSender { + public: + PacketSender(std::shared_ptr ice_agent, + std::shared_ptr clock); + ~PacketSender(); + + int SendPacket(const char* data, size_t size); + + public: + void CreateProbeClusters( + std::vector probe_cluster_configs) override{}; + + // Temporarily pause all sending. + void Pause() override{}; + + // Resume sending packets. + void Resume() override{}; + + void SetCongested(bool congested) override{}; + + // Sets the pacing rates. Must be called once before packets can be sent. + void SetPacingRates(webrtc::DataRate pacing_rate, + webrtc::DataRate padding_rate) override{}; + + // Time since the oldest packet currently in the queue was added. + webrtc::TimeDelta OldestPacketWaitTime() const override { + return webrtc::TimeDelta::Zero(); + }; + + // Sum of payload + padding bytes of all packets currently in the pacer queue. + webrtc::DataSize QueueSizeData() const override { + return webrtc::DataSize::Zero(); + }; + + // Returns the time when the first packet was sent. + std::optional FirstSentPacketTime() const override { + return {}; + } + + // Returns the expected number of milliseconds it will take to send the + // current packets in the queue, given the current size and bitrate, ignoring + // priority. + webrtc::TimeDelta ExpectedQueueTime() const override { + return webrtc::TimeDelta::Zero(); + }; + + // Set the average upper bound on pacer queuing delay. The pacer may send at + // a higher rate than what was configured via SetPacingRates() in order to + // keep ExpectedQueueTimeMs() below `limit_ms` on average. + void SetQueueTimeLimit(webrtc::TimeDelta limit) override{}; + + // Currently audio traffic is not accounted by pacer and passed through. + // With the introduction of audio BWE audio traffic will be accounted for + // the pacer budget calculation. The audio traffic still will be injected + // at high priority. + void SetAccountForAudioPackets(bool account_for_audio) override{}; + void SetIncludeOverhead() override{}; + void SetTransportOverhead(webrtc::DataSize overhead_per_packet) override{}; + + public: + void SendPacket(std::unique_ptr packet, + const webrtc::PacedPacketInfo& cluster_info) override {} + // Should be called after each call to SendPacket(). + std::vector> FetchFec() override { + return {}; + } + std::vector> GeneratePadding( + webrtc::DataSize size) override { + return {}; + } + // TODO(bugs.webrtc.org/1439830): Make pure once subclasses adapt. + void OnBatchComplete() override {} + + // TODO(bugs.webrtc.org/11340): Make pure once downstream projects + // have been updated. + void OnAbortedRetransmissions( + uint32_t /* ssrc */, + rtc::ArrayView /* sequence_numbers */) {} + std::optional GetRtxSsrcForMedia( + uint32_t /* ssrc */) const override { + return std::nullopt; + } + + private: + std::shared_ptr ice_agent_ = nullptr; + webrtc::PacingController pacing_controller_; + + private: + std::shared_ptr clock_ = nullptr; +}; + +#endif \ No newline at end of file diff --git a/xmake.lua b/xmake.lua index 2feff3b..99d42f9 100644 --- a/xmake.lua +++ b/xmake.lua @@ -113,10 +113,10 @@ target("ws") target("rtp") set_kind("object") add_deps("log", "common", "frame", "ringbuffer", "thread", "rtcp", "fec", "statistics") - add_files("src/rtp/*.cpp", + add_files("src/rtp/rtp_statistics/*.cpp", "src/rtp/rtp_packet/*.cpp", "src/rtp/rtp_packetizer/*.cpp") - add_includedirs("src/rtp", + add_includedirs("src/rtp/rtp_statistics", "src/rtp/rtp_packet", "src/rtp/rtp_packetizer", {public = true}) @@ -148,7 +148,7 @@ target("channel") target("transport") set_kind("object") - add_deps("log", "ws", "ice", "channel", "rtp", "rtcp", "statistics", "media") + add_deps("log", "ws", "ice", "channel", "rtp", "rtcp", "statistics", "media", "qos") add_files("src/transport/*.cpp") add_includedirs("src/transport", {public = true})