diff --git a/src/channel/video_channel_send.cpp b/src/channel/video_channel_send.cpp index b75c7cb..9aff283 100644 --- a/src/channel/video_channel_send.cpp +++ b/src/channel/video_channel_send.cpp @@ -16,7 +16,7 @@ void VideoChannelSend::Initialize(RtpPacket::PAYLOAD_TYPE payload_type) { rtp_video_sender_ = std::make_unique(ice_io_statistics_); rtp_video_sender_->SetSendDataFunc( - [this](const char *data, size_t size) -> int { + [this](const char* data, size_t size) -> int { if (!ice_agent_) { LOG_ERROR("ice_agent_ is nullptr"); return -1; @@ -44,14 +44,43 @@ void VideoChannelSend::Destroy() { } } -int VideoChannelSend::SendVideo(char *data, size_t size) { +int VideoChannelSend::SendVideo(char* data, size_t size) { std::vector packets; if (rtp_video_sender_) { if (video_rtp_codec_) { - video_rtp_codec_->Encode((uint8_t *)data, (uint32_t)size, packets); + video_rtp_codec_->Encode((uint8_t*)data, (uint32_t)size, packets); } rtp_video_sender_->Enqueue(packets); } return 0; +} + +void VideoChannelSend::OnCongestionControlFeedback( + int64_t recv_ts, const CongestionControlFeedback& feedback) { + ++feedback_count_; + std::optional feedback_msg = + transport_feedback_adapter_.ProcessCongestionControlFeedback(feedback, + recv_ts); + if (feedback_msg) { + HandleTransportPacketsFeedback(*feedback_msg); + } +} + +void VideoChannelSend::HandleTransportPacketsFeedback( + const TransportPacketsFeedback& feedback) { + // if (transport_is_ecn_capable_) { + // // If transport does not support ECN, packets should not be sent as + // ECT(1). + // // TODO: bugs.webrtc.org/42225697 - adapt to ECN feedback and continue to + // // send packets as ECT(1) if transport is ECN capable. + // transport_is_ecn_capable_ = false; + // LOG_INFO("Transport is {} ECN capable. Stop sending ECT(1)", + // (feedback.transport_supports_ecn ? "" : " not ")); + // } + // if (controller_) + // PostUpdates(controller_->OnTransportPacketsFeedback(feedback)); + + // // Only update outstanding data if any packet is first time acked. + // UpdateCongestedState(); } \ No newline at end of file diff --git a/src/channel/video_channel_send.h b/src/channel/video_channel_send.h index f2e9622..89c8874 100644 --- a/src/channel/video_channel_send.h +++ b/src/channel/video_channel_send.h @@ -10,6 +10,7 @@ #include "ice_agent.h" #include "rtp_codec.h" #include "rtp_video_sender.h" +#include "transport_feedback_adapter.h" class VideoChannelSend { public: @@ -22,13 +23,26 @@ class VideoChannelSend { void Initialize(RtpPacket::PAYLOAD_TYPE payload_type); void Destroy(); - int SendVideo(char *data, size_t size); + int SendVideo(char* data, size_t size); + + void OnCongestionControlFeedback(int64_t recv_ts, + const CongestionControlFeedback& feedback); + + void HandleTransportPacketsFeedback(const TransportPacketsFeedback& feedback); private: std::shared_ptr ice_agent_ = nullptr; std::shared_ptr ice_io_statistics_ = nullptr; std::unique_ptr video_rtp_codec_ = nullptr; std::unique_ptr rtp_video_sender_ = nullptr; + + private: + int64_t current_offset_ = std::numeric_limits::min(); + // Used by RFC 8888 congestion control feedback to track base time. + std::optional last_feedback_compact_ntp_time_; + int feedback_count_ = 0; + + TransportFeedbackAdapter transport_feedback_adapter_; }; #endif \ No newline at end of file diff --git a/src/common/network_route.h b/src/common/network_route.h new file mode 100644 index 0000000..6a08132 --- /dev/null +++ b/src/common/network_route.h @@ -0,0 +1,97 @@ +/* + * @Author: DI JUNKUN + * @Date: 2025-01-13 + * Copyright (c) 2025 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _NETWORK_ROUTE_H_ +#define _NETWORK_ROUTE_H_ + +#include +#include + +struct NetworkRoute; + +class RouteEndpoint { + public: + enum AdapterType { + // This enum resembles the one in Chromium net::ConnectionType. + ADAPTER_TYPE_UNKNOWN = 0, + ADAPTER_TYPE_ETHERNET = 1 << 0, + ADAPTER_TYPE_WIFI = 1 << 1, + ADAPTER_TYPE_CELLULAR = 1 << 2, // This is CELLULAR of unknown type. + ADAPTER_TYPE_VPN = 1 << 3, + ADAPTER_TYPE_LOOPBACK = 1 << 4, + // ADAPTER_TYPE_ANY is used for a network, which only contains a single "any + // address" IP address (INADDR_ANY for IPv4 or in6addr_any for IPv6), and + // can + // use any/all network interfaces. Whereas ADAPTER_TYPE_UNKNOWN is used + // when the network uses a specific interface/IP, but its interface type can + // not be determined or not fit in this enum. + ADAPTER_TYPE_ANY = 1 << 5, + ADAPTER_TYPE_CELLULAR_2G = 1 << 6, + ADAPTER_TYPE_CELLULAR_3G = 1 << 7, + ADAPTER_TYPE_CELLULAR_4G = 1 << 8, + ADAPTER_TYPE_CELLULAR_5G = 1 << 9 + }; + + public: + RouteEndpoint() {} // Used by tests. + RouteEndpoint(AdapterType adapter_type, uint16_t adapter_id, + uint16_t network_id, bool uses_turn) + : adapter_type_(adapter_type), + adapter_id_(adapter_id), + network_id_(network_id), + uses_turn_(uses_turn) {} + + RouteEndpoint(const RouteEndpoint&) = default; + RouteEndpoint& operator=(const RouteEndpoint&) = default; + + bool operator==(const RouteEndpoint& other) const { + return adapter_type_ == other.adapter_type_ && + adapter_id_ == other.adapter_id_ && + network_id_ == other.network_id_ && uses_turn_ == other.uses_turn_; + } + + // Used by tests. + static RouteEndpoint CreateWithNetworkId(uint16_t network_id) { + return RouteEndpoint(ADAPTER_TYPE_UNKNOWN, + /* adapter_id = */ 0, network_id, + /* uses_turn = */ false); + } + RouteEndpoint CreateWithTurn(bool uses_turn) const { + return RouteEndpoint(adapter_type_, adapter_id_, network_id_, uses_turn); + } + + AdapterType adapter_type() const { return adapter_type_; } + uint16_t adapter_id() const { return adapter_id_; } + uint16_t network_id() const { return network_id_; } + bool uses_turn() const { return uses_turn_; } + + private: + AdapterType adapter_type_ = ADAPTER_TYPE_UNKNOWN; + uint16_t adapter_id_ = 0; + uint16_t network_id_ = 0; + bool uses_turn_ = false; +}; + +struct NetworkRoute { + bool connected = false; + RouteEndpoint local; + RouteEndpoint remote; + // Last packet id sent on the PREVIOUS route. + int last_sent_packet_id = -1; + // The overhead in bytes from IP layer and above. + // This is the maximum of any part of the route. + int packet_overhead = 0; + + bool operator==(const NetworkRoute& other) const { + return connected == other.connected && local == other.local && + remote == other.remote && packet_overhead == other.packet_overhead && + last_sent_packet_id == other.last_sent_packet_id; + } + + bool operator!=(const NetworkRoute& other) { return !operator==(other); } +}; + +#endif \ No newline at end of file diff --git a/src/common/network_types.h b/src/common/network_types.h new file mode 100644 index 0000000..d8e3ea4 --- /dev/null +++ b/src/common/network_types.h @@ -0,0 +1,161 @@ +/* + * @Author: DI JUNKUN + * @Date: 2025-01-13 + * Copyright (c) 2025 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _NETWORK_TYPES_H_ +#define _NETWORK_TYPES_H_ + +#include +#include +#include + +#include "enc_mark.h" + +struct NetworkEstimate { + int64_t at_time = std::numeric_limits::max(); + // Deprecated, use TargetTransferRate::target_rate instead. + int64_t bandwidth = std::numeric_limits::max(); + int64_t round_trip_time = std::numeric_limits::max(); + int64_t bwe_period = std::numeric_limits::max(); + + float loss_rate_ratio = 0; +}; + +struct TargetTransferRate { + int64_t at_time = std::numeric_limits::max(); + // The estimate on which the target rate is based on. + NetworkEstimate network_estimate; + int64_t target_rate = 0; + int64_t stable_target_rate = 0; + double cwnd_reduce_ratio = 0; +}; + +struct NetworkControlUpdate { + NetworkControlUpdate() = default; + NetworkControlUpdate(const NetworkControlUpdate&) = default; + ~NetworkControlUpdate() = default; + + bool has_updates() const { + // return congestion_window.has_value() || pacer_config.has_value() || + // !probe_cluster_configs.empty() || + return target_rate.has_value(); + } + + std::optional congestion_window; + // std::optional pacer_config; + // std::vector probe_cluster_configs; + std::optional target_rate; +}; + +struct PacedPacketInfo { + PacedPacketInfo() = default; + PacedPacketInfo(int probe_cluster_id, int probe_cluster_min_probes, + int probe_cluster_min_bytes) + : probe_cluster_id(probe_cluster_id), + probe_cluster_min_probes(probe_cluster_min_probes), + probe_cluster_min_bytes(probe_cluster_min_bytes) {} + + bool operator==(const PacedPacketInfo& rhs) const { + return send_bitrate == rhs.send_bitrate && + probe_cluster_id == rhs.probe_cluster_id && + probe_cluster_min_probes == rhs.probe_cluster_min_probes && + probe_cluster_min_bytes == rhs.probe_cluster_min_bytes; + } + + // TODO(srte): Move probing info to a separate, optional struct. + static constexpr int kNotAProbe = -1; + int64_t send_bitrate = 0; + int probe_cluster_id = kNotAProbe; + int probe_cluster_min_probes = -1; + int probe_cluster_min_bytes = -1; + int probe_cluster_bytes_sent = 0; +}; + +struct SentPacket { + int64_t send_time = std::numeric_limits::max(); + // Size of packet with overhead up to IP layer. + int64_t size = 0; + // Size of preceeding packets that are not part of feedback. + int64_t prior_unacked_data = 0; + // Probe cluster id and parameters including bitrate, number of packets and + // number of bytes. + PacedPacketInfo pacing_info; + // True if the packet is an audio packet, false for video, padding, RTX etc. + bool audio = false; + // Transport independent sequence number, any tracked packet should have a + // sequence number that is unique over the whole call and increasing by 1 for + // each packet. + int64_t sequence_number; + // Tracked data in flight when the packet was sent, excluding unacked data. + int64_t data_in_flight = 0; +}; + +struct PacketResult { + class ReceiveTimeOrder { + public: + bool operator()(const PacketResult& lhs, const PacketResult& rhs); + }; + + PacketResult() = default; + PacketResult(const PacketResult&) = default; + ~PacketResult() = default; + + inline bool IsReceived() const { + return receive_time != std::numeric_limits::max(); + } + + SentPacket sent_packet; + int64_t receive_time = std::numeric_limits::max(); + EcnMarking ecn = EcnMarking::kNotEct; +}; + +struct TransportPacketsFeedback { + TransportPacketsFeedback() = default; + TransportPacketsFeedback(const TransportPacketsFeedback& other) = default; + ~TransportPacketsFeedback() = default; + + int64_t feedback_time = std::numeric_limits::max(); + int64_t data_in_flight = 0; + bool transport_supports_ecn = false; + std::vector packet_feedbacks; + + // Arrival times for messages without send time information. + std::vector sendless_arrival_times; + + std::vector ReceivedWithSendInfo() const { + std::vector res; + for (const PacketResult& fb : packet_feedbacks) { + if (fb.IsReceived()) { + res.push_back(fb); + } + } + return res; + } + std::vector LostWithSendInfo() const { + std::vector res; + for (const PacketResult& fb : packet_feedbacks) { + if (!fb.IsReceived()) { + res.push_back(fb); + } + } + return res; + } + + std::vector PacketsWithFeedback() const { + return packet_feedbacks; + } + std::vector SortedByReceiveTime() const { + std::vector res; + for (const PacketResult& fb : packet_feedbacks) { + if (fb.IsReceived()) { + res.push_back(fb); + } + } + std::sort(res.begin(), res.end(), PacketResult::ReceiveTimeOrder()); + return res; + } +}; + +#endif \ No newline at end of file diff --git a/src/qos/congestion_control.cpp b/src/qos/congestion_control.cpp index 060b616..0003266 100644 --- a/src/qos/congestion_control.cpp +++ b/src/qos/congestion_control.cpp @@ -1,5 +1,201 @@ #include "congestion_control.h" +#include +#include +#include + +#include "log.h" + +constexpr int64_t kLossUpdateInterval = 1000; + +// Pacing-rate relative to our target send rate. +// Multiplicative factor that is applied to the target bitrate to calculate +// the number of bytes that can be transmitted per interval. +// Increasing this factor will result in lower delays in cases of bitrate +// overshoots from the encoder. +constexpr float kDefaultPaceMultiplier = 2.5f; + +// If the probe result is far below the current throughput estimate +// it's unlikely that the probe is accurate, so we don't want to drop too far. +// However, if we actually are overusing, we want to drop to something slightly +// below the current throughput estimate to drain the network queues. +constexpr double kProbeDropThroughputFraction = 0.85; + CongestionControl::CongestionControl() {} -CongestionControl::~CongestionControl() {} \ No newline at end of file +CongestionControl::~CongestionControl() {} + +NetworkControlUpdate CongestionControl::OnTransportPacketsFeedback( + TransportPacketsFeedback report) { + if (report.packet_feedbacks.empty()) { + // TODO(bugs.webrtc.org/10125): Design a better mechanism to safe-guard + // against building very large network queues. + return NetworkControlUpdate(); + } + + if (congestion_window_pushback_controller_) { + congestion_window_pushback_controller_->UpdateOutstandingData( + report.data_in_flight); + } + int64_t max_feedback_rtt = std::numeric_limits::min(); + int64_t min_propagation_rtt = std::numeric_limits::max(); + int64_t max_recv_time = std::numeric_limits::min(); + + std::vector feedbacks = report.ReceivedWithSendInfo(); + for (const auto& feedback : feedbacks) + max_recv_time = std::max(max_recv_time, feedback.receive_time); + + for (const auto& feedback : feedbacks) { + int64_t feedback_rtt = + report.feedback_time - feedback.sent_packet.send_time; + int64_t min_pending_time = max_recv_time - feedback.receive_time; + int64_t propagation_rtt = feedback_rtt - min_pending_time; + max_feedback_rtt = std::max(max_feedback_rtt, feedback_rtt); + min_propagation_rtt = std::min(min_propagation_rtt, propagation_rtt); + } + + if (max_feedback_rtt != std::numeric_limits::min() && + min_propagation_rtt != std::numeric_limits::max()) { + feedback_max_rtts_.push_back(max_feedback_rtt); + const size_t kMaxFeedbackRttWindow = 32; + if (feedback_max_rtts_.size() > kMaxFeedbackRttWindow) + feedback_max_rtts_.pop_front(); + // TODO(srte): Use time since last unacknowledged packet. + // bandwidth_estimation_->UpdatePropagationRtt(report.feedback_time, + // min_propagation_rtt); + } + if (packet_feedback_only_) { + if (!feedback_max_rtts_.empty()) { + int64_t sum_rtt_ms = + std::accumulate(feedback_max_rtts_.begin(), feedback_max_rtts_.end(), + static_cast(0)); + // int64_t mean_rtt_ms = sum_rtt_ms / feedback_max_rtts_.size(); + // if (delay_based_bwe_) delay_based_bwe_->OnRttUpdate(mean_rtt_ms); + } + + int64_t feedback_min_rtt = std::numeric_limits::max(); + for (const auto& packet_feedback : feedbacks) { + int64_t pending_time = max_recv_time - packet_feedback.receive_time; + int64_t rtt = report.feedback_time - + packet_feedback.sent_packet.send_time - pending_time; + // Value used for predicting NACK round trip time in FEC controller. + feedback_min_rtt = std::min(rtt, feedback_min_rtt); + } + if (feedback_min_rtt != std::numeric_limits::max() && + feedback_min_rtt != std::numeric_limits::min()) { + // bandwidth_estimation_->UpdateRtt(feedback_min_rtt, + // report.feedback_time); + } + + expected_packets_since_last_loss_update_ += + report.PacketsWithFeedback().size(); + for (const auto& packet_feedback : report.PacketsWithFeedback()) { + if (!packet_feedback.IsReceived()) + lost_packets_since_last_loss_update_ += 1; + } + if (report.feedback_time > next_loss_update_) { + next_loss_update_ = report.feedback_time + kLossUpdateInterval; + // bandwidth_estimation_->UpdatePacketsLost( + // lost_packets_since_last_loss_update_, + // expected_packets_since_last_loss_update_, report.feedback_time); + expected_packets_since_last_loss_update_ = 0; + lost_packets_since_last_loss_update_ = 0; + } + } + // std::optional alr_start_time = + // alr_detector_->GetApplicationLimitedRegionStartTime(); + + // if (previously_in_alr_ && !alr_start_time.has_value()) { + // int64_t now_ms = report.feedback_time; + // acknowledged_bitrate_estimator_->SetAlrEndedTime(report.feedback_time); + // probe_controller_->SetAlrEndedTimeMs(now_ms); + // } + // previously_in_alr_ = alr_start_time.has_value(); + // acknowledged_bitrate_estimator_->IncomingPacketFeedbackVector( + // report.SortedByReceiveTime()); + // auto acknowledged_bitrate = acknowledged_bitrate_estimator_->bitrate(); + // bandwidth_estimation_->SetAcknowledgedRate(acknowledged_bitrate, + // report.feedback_time); + for (const auto& feedback : report.SortedByReceiveTime()) { + if (feedback.sent_packet.pacing_info.probe_cluster_id != + PacedPacketInfo::kNotAProbe) { + // probe_bitrate_estimator_->HandleProbeAndEstimateBitrate(feedback); + } + } + + // if (network_estimator_) { + // network_estimator_->OnTransportPacketsFeedback(report); + // // SetNetworkStateEstimate(network_estimator_->GetCurrentEstimate()); + // } + // std::optional probe_bitrate = + // probe_bitrate_estimator_->FetchAndResetLastEstimatedBitrate(); + // if (ignore_probes_lower_than_network_estimate_ && probe_bitrate && + // estimate_ && *probe_bitrate < delay_based_bwe_->last_estimate() && + // *probe_bitrate < estimate_->link_capacity_lower) { + // probe_bitrate.reset(); + // } + // if (limit_probes_lower_than_throughput_estimate_ && probe_bitrate && + // acknowledged_bitrate) { + // Limit the backoff to something slightly below the acknowledged + // bitrate. ("Slightly below" because we want to drain the queues + // if we are actually overusing.) + // The acknowledged bitrate shouldn't normally be higher than the delay + // based estimate, but it could happen e.g. due to packet bursts or + // encoder overshoot. We use std::min to ensure that a probe result + // below the current BWE never causes an increase. + // int64_t limit = + // std::min(delay_based_bwe_->last_estimate(), + // *acknowledged_bitrate * kProbeDropThroughputFraction); + // probe_bitrate = std::max(*probe_bitrate, limit); + // } + + NetworkControlUpdate update; + bool recovered_from_overuse = false; + + // DelayBasedBwe::Result result; + // result = delay_based_bwe_->IncomingPacketFeedbackVector( + // report, acknowledged_bitrate, probe_bitrate, estimate_, + // alr_start_time.has_value()); + + // if (result.updated) { + // if (result.probe) { + // bandwidth_estimation_->SetSendBitrate(result.target_bitrate, + // report.feedback_time); + // } + // Since SetSendBitrate now resets the delay-based estimate, we have to + // call UpdateDelayBasedEstimate after SetSendBitrate. + // bandwidth_estimation_->UpdateDelayBasedEstimate(report.feedback_time, + // result.target_bitrate); + // } + // bandwidth_estimation_->UpdateLossBasedEstimator( + // report, result.delay_detector_state, probe_bitrate, + // alr_start_time.has_value()); + // if (result.updated) { + // // Update the estimate in the ProbeController, in case we want to probe. + // MaybeTriggerOnNetworkChanged(&update, report.feedback_time); + // } + + // recovered_from_overuse = result.recovered_from_overuse; + + // if (recovered_from_overuse) { + // probe_controller_->SetAlrStartTimeMs(alr_start_time); + // auto probes = probe_controller_->RequestProbe(report.feedback_time); + // update.probe_cluster_configs.insert(update.probe_cluster_configs.end(), + // probes.begin(), probes.end()); + // } + + // No valid RTT could be because send-side BWE isn't used, in which case + // we don't try to limit the outstanding packets. + // if (rate_control_settings_.UseCongestionWindow() && + // max_feedback_rtt.IsFinite()) { + // UpdateCongestionWindowSize(); + // } + if (congestion_window_pushback_controller_ && current_data_window_) { + congestion_window_pushback_controller_->SetDataWindow( + *current_data_window_); + } else { + update.congestion_window = current_data_window_; + } + + return update; +} \ No newline at end of file diff --git a/src/qos/congestion_control.h b/src/qos/congestion_control.h index babe027..ca6e7db 100644 --- a/src/qos/congestion_control.h +++ b/src/qos/congestion_control.h @@ -1,12 +1,35 @@ #ifndef _CONGESTION_CONTROL_H_ #define _CONGESTION_CONTROL_H_ +#include +#include + +#include "congestion_window_pushback_controller.h" +#include "network_types.h" + class CongestionControl { public: CongestionControl(); ~CongestionControl(); + public: + NetworkControlUpdate OnTransportPacketsFeedback( + TransportPacketsFeedback report); + private: + const std::unique_ptr + congestion_window_pushback_controller_; + + private: + std::deque feedback_max_rtts_; + // std::unique_ptr bandwidth_estimation_; + int expected_packets_since_last_loss_update_ = 0; + int lost_packets_since_last_loss_update_ = 0; + int64_t next_loss_update_ = std::numeric_limits::min(); + const bool packet_feedback_only_ = false; + bool previously_in_alr_ = false; + // const bool limit_probes_lower_than_throughput_estimate_; + std::optional current_data_window_; }; #endif \ No newline at end of file diff --git a/src/qos/congestion_window_pushback_controller.cpp b/src/qos/congestion_window_pushback_controller.cpp new file mode 100644 index 0000000..da46937 --- /dev/null +++ b/src/qos/congestion_window_pushback_controller.cpp @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2018 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "congestion_window_pushback_controller.h" + +#include +#include + +CongestionWindowPushbackController::CongestionWindowPushbackController() {} + +void CongestionWindowPushbackController::UpdateOutstandingData( + int64_t outstanding_bytes) { + outstanding_bytes_ = outstanding_bytes; +} +void CongestionWindowPushbackController::UpdatePacingQueue( + int64_t pacing_bytes) { + pacing_bytes_ = pacing_bytes; +} + +void CongestionWindowPushbackController::SetDataWindow(int64_t data_window) { + current_data_window_ = data_window; +} + +uint32_t CongestionWindowPushbackController::UpdateTargetBitrate( + uint32_t bitrate_bps) { + if (!current_data_window_ || current_data_window_ == 0) return bitrate_bps; + int64_t total_bytes = outstanding_bytes_; + if (add_pacing_) total_bytes += pacing_bytes_; + double fill_ratio = + total_bytes / static_cast(current_data_window_.value()); + if (fill_ratio > 1.5) { + encoding_rate_ratio_ *= 0.9; + } else if (fill_ratio > 1) { + encoding_rate_ratio_ *= 0.95; + } else if (fill_ratio < 0.1) { + encoding_rate_ratio_ = 1.0; + } else { + encoding_rate_ratio_ *= 1.05; + encoding_rate_ratio_ = std::min(encoding_rate_ratio_, 1.0); + } + uint32_t adjusted_target_bitrate_bps = + static_cast(bitrate_bps * encoding_rate_ratio_); + + // Do not adjust below the minimum pushback bitrate but do obey if the + // original estimate is below it. + bitrate_bps = adjusted_target_bitrate_bps < min_pushback_target_bitrate_bps_ + ? std::min(bitrate_bps, min_pushback_target_bitrate_bps_) + : adjusted_target_bitrate_bps; + return bitrate_bps; +} diff --git a/src/qos/congestion_window_pushback_controller.h b/src/qos/congestion_window_pushback_controller.h new file mode 100644 index 0000000..b4de439 --- /dev/null +++ b/src/qos/congestion_window_pushback_controller.h @@ -0,0 +1,36 @@ +/* + * @Author: DI JUNKUN + * @Date: 2025-01-13 + * Copyright (c) 2025 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _CONGESTION_WINDOW_PUSHBACK_CONTROLLER_H_ +#define _CONGESTION_WINDOW_PUSHBACK_CONTROLLER_H_ + +#include + +#include + +// This class enables pushback from congestion window directly to video encoder. +// When the congestion window is filling up, the video encoder target bitrate +// will be reduced accordingly to accommodate the network changes. To avoid +// pausing video too frequently, a minimum encoder target bitrate threshold is +// used to prevent video pause due to a full congestion window. +class CongestionWindowPushbackController { + public: + explicit CongestionWindowPushbackController(); + void UpdateOutstandingData(int64_t outstanding_bytes); + void UpdatePacingQueue(int64_t pacing_bytes); + uint32_t UpdateTargetBitrate(uint32_t bitrate_bps); + void SetDataWindow(int64_t data_window); + + private: + const bool add_pacing_ = true; + const uint32_t min_pushback_target_bitrate_bps_ = 10000; + std::optional current_data_window_ = std::nullopt; + int64_t outstanding_bytes_ = 0; + int64_t pacing_bytes_ = 0; + double encoding_rate_ratio_ = 1.0; +}; + +#endif \ No newline at end of file diff --git a/src/qos/transport_feedback_adapter.cpp b/src/qos/transport_feedback_adapter.cpp new file mode 100644 index 0000000..564317b --- /dev/null +++ b/src/qos/transport_feedback_adapter.cpp @@ -0,0 +1,229 @@ +/* + * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "transport_feedback_adapter.h" + +#include + +#include +#include +#include +#include +#include + +#include "log.h" + +constexpr int64_t kSendTimeHistoryWindow = 60; + +void InFlightBytesTracker::AddInFlightPacketBytes( + const PacketFeedback& packet) { + auto it = in_flight_data_.find(packet.network_route); + if (it != in_flight_data_.end()) { + it->second += packet.sent.size; + } else { + in_flight_data_.insert({packet.network_route, packet.sent.size}); + } +} + +void InFlightBytesTracker::RemoveInFlightPacketBytes( + const PacketFeedback& packet) { + if (packet.sent.send_time == std::numeric_limits::max() || + packet.sent.send_time == std::numeric_limits::min()) + return; + auto it = in_flight_data_.find(packet.network_route); + if (it != in_flight_data_.end()) { + it->second -= packet.sent.size; + if (it->second == 0) in_flight_data_.erase(it); + } +} + +int64_t InFlightBytesTracker::GetOutstandingData( + const NetworkRoute& network_route) const { + auto it = in_flight_data_.find(network_route); + if (it != in_flight_data_.end()) { + return it->second; + } else { + return 0; + } +} + +// Comparator for consistent map with NetworkRoute as key. +bool InFlightBytesTracker::NetworkRouteComparator::operator()( + const NetworkRoute& a, const NetworkRoute& b) const { + if (a.local.network_id() != b.local.network_id()) + return a.local.network_id() < b.local.network_id(); + if (a.remote.network_id() != b.remote.network_id()) + return a.remote.network_id() < b.remote.network_id(); + + if (a.local.adapter_id() != b.local.adapter_id()) + return a.local.adapter_id() < b.local.adapter_id(); + if (a.remote.adapter_id() != b.remote.adapter_id()) + return a.remote.adapter_id() < b.remote.adapter_id(); + + if (a.local.uses_turn() != b.local.uses_turn()) + return a.local.uses_turn() < b.local.uses_turn(); + if (a.remote.uses_turn() != b.remote.uses_turn()) + return a.remote.uses_turn() < b.remote.uses_turn(); + + return a.connected < b.connected; +} + +TransportFeedbackAdapter::TransportFeedbackAdapter() = default; + +std::optional +TransportFeedbackAdapter::ProcessCongestionControlFeedback( + const CongestionControlFeedback& feedback, int64_t feedback_receive_time) { + if (feedback.packets().empty()) { + LOG_INFO("Empty congestion control feedback packet received"); + return std::nullopt; + } + if (current_offset_ == std::numeric_limits::max() || + current_offset_ == std::numeric_limits::min()) { + current_offset_ = feedback_receive_time; + } + int64_t feedback_delta = last_feedback_compact_ntp_time_ + ? (feedback.report_timestamp_compact_ntp() - + *last_feedback_compact_ntp_time_) + : 0; + last_feedback_compact_ntp_time_ = feedback.report_timestamp_compact_ntp(); + if (feedback_delta < 0) { + LOG_WARN("Unexpected feedback ntp time delta {}", feedback_delta); + current_offset_ = feedback_receive_time; + } else { + current_offset_ += feedback_delta; + } + + int ignored_packets = 0; + int failed_lookups = 0; + bool supports_ecn = true; + std::vector packet_result_vector; + for (const CongestionControlFeedback::PacketInfo& packet_info : + feedback.packets()) { + std::optional packet_feedback = + RetrievePacketFeedback({packet_info.ssrc, packet_info.sequence_number}, + /*received=*/packet_info.arrival_time_offset != + std::numeric_limits::min() && + packet_info.arrival_time_offset != + std::numeric_limits::max()); + if (!packet_feedback) { + ++failed_lookups; + continue; + } + if (packet_feedback->network_route != network_route_) { + ++ignored_packets; + continue; + } + PacketResult result; + result.sent_packet = packet_feedback->sent; + if (packet_info.arrival_time_offset != + std::numeric_limits::min() && + packet_info.arrival_time_offset != + std::numeric_limits::max()) { + result.receive_time = current_offset_ - packet_info.arrival_time_offset; + supports_ecn &= packet_info.ecn != EcnMarking::kNotEct; + } + result.ecn = packet_info.ecn; + packet_result_vector.push_back(result); + } + + if (failed_lookups > 0) { + LOG_WARN( + "Failed to lookup send time for {} packet {}. Packets reordered or " + "send time history too small?", + failed_lookups, (failed_lookups > 1 ? "s" : "")); + } + if (ignored_packets > 0) { + LOG_INFO("Ignoring {} packets because they were sent on a different route", + ignored_packets); + } + + // Feedback is expected to be sorted in send order. + std::sort(packet_result_vector.begin(), packet_result_vector.end(), + [](const PacketResult& lhs, const PacketResult& rhs) { + return lhs.sent_packet.sequence_number < + rhs.sent_packet.sequence_number; + }); + return ToTransportFeedback(std::move(packet_result_vector), + feedback_receive_time, supports_ecn); +} + +std::optional +TransportFeedbackAdapter::ToTransportFeedback( + std::vector packet_results, int64_t feedback_receive_time, + bool supports_ecn) { + TransportPacketsFeedback msg; + msg.feedback_time = feedback_receive_time; + if (packet_results.empty()) { + return std::nullopt; + } + msg.packet_feedbacks = std::move(packet_results); + msg.data_in_flight = in_flight_.GetOutstandingData(network_route_); + msg.transport_supports_ecn = supports_ecn; + + return msg; +} + +void TransportFeedbackAdapter::SetNetworkRoute( + const NetworkRoute& network_route) { + network_route_ = network_route; +} + +int64_t TransportFeedbackAdapter::GetOutstandingData() const { + return in_flight_.GetOutstandingData(network_route_); +} + +std::optional TransportFeedbackAdapter::RetrievePacketFeedback( + const SsrcAndRtpSequencenumber& key, bool received) { + auto it = rtp_to_transport_sequence_number_.find(key); + if (it == rtp_to_transport_sequence_number_.end()) { + return std::nullopt; + } + return RetrievePacketFeedback(it->second, received); +} + +std::optional TransportFeedbackAdapter::RetrievePacketFeedback( + int64_t transport_seq_num, bool received) { + if (transport_seq_num > last_ack_seq_num_) { + // Starts at history_.begin() if last_ack_seq_num_ < 0, since any + // valid sequence number is >= 0. + for (auto it = history_.upper_bound(last_ack_seq_num_); + it != history_.upper_bound(transport_seq_num); ++it) { + in_flight_.RemoveInFlightPacketBytes(it->second); + } + last_ack_seq_num_ = transport_seq_num; + } + + auto it = history_.find(transport_seq_num); + if (it == history_.end()) { + LOG_WARN( + "Failed to lookup send time for packet with {}. Send time history too " + "small?", + transport_seq_num); + return std::nullopt; + } + + if (it->second.sent.send_time == std::numeric_limits::max() || + it->second.sent.send_time == std::numeric_limits::min()) { + // TODO(srte): Fix the tests that makes this happen and make this a + // DCHECK. + LOG_ERROR("Received feedback before packet was indicated as sent"); + return std::nullopt; + } + + PacketFeedback packet_feedback = it->second; + if (received) { + // Note: Lost packets are not removed from history because they might + // be reported as received by a later feedback. + rtp_to_transport_sequence_number_.erase( + {packet_feedback.ssrc, packet_feedback.rtp_sequence_number}); + history_.erase(it); + } + return packet_feedback; +} diff --git a/src/qos/transport_feedback_adapter.h b/src/qos/transport_feedback_adapter.h new file mode 100644 index 0000000..0214cd2 --- /dev/null +++ b/src/qos/transport_feedback_adapter.h @@ -0,0 +1,115 @@ +/* + * @Author: DI JUNKUN + * @Date: 2025-01-13 + * Copyright (c) 2025 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _TRANSPORT_FEEDBACK_ADAPTER_H_ +#define _TRANSPORT_FEEDBACK_ADAPTER_H_ + +#include +#include +#include +#include +#include +#include + +#include "congestion_control_feedback.h" +#include "network_route.h" +#include "network_types.h" +#include "sequence_number_unwrapper.h" + +struct PacketFeedback { + PacketFeedback() = default; + // Time corresponding to when this object was created. + int64_t creation_time = std::numeric_limits::min(); + SentPacket sent; + // Time corresponding to when the packet was received. Timestamped with the + // receiver's clock. For unreceived packet, + // std::numeric_limits::max() is used. + int64_t receive_time = std::numeric_limits::max(); + + // The network route that this packet is associated with. + NetworkRoute network_route; + + uint32_t ssrc = 0; + uint16_t rtp_sequence_number = 0; +}; + +class InFlightBytesTracker { + public: + void AddInFlightPacketBytes(const PacketFeedback& packet); + void RemoveInFlightPacketBytes(const PacketFeedback& packet); + int64_t GetOutstandingData(const NetworkRoute& network_route) const; + + private: + struct NetworkRouteComparator { + bool operator()(const NetworkRoute& a, const NetworkRoute& b) const; + }; + std::map in_flight_data_; +}; + +// TransportFeedbackAdapter converts RTCP feedback packets to RTCP agnostic per +// packet send/receive information. +// It supports rtcp::CongestionControlFeedback according to RFC 8888 and +// rtcp::TransportFeedback according to +// https://datatracker.ietf.org/doc/html/draft-holmer-rmcat-transport-wide-cc-extensions-01 +class TransportFeedbackAdapter { + public: + TransportFeedbackAdapter(); + + std::optional ProcessCongestionControlFeedback( + const CongestionControlFeedback& feedback, int64_t feedback_receive_time); + + void SetNetworkRoute(const NetworkRoute& network_route); + + int64_t GetOutstandingData() const; + + private: + enum class SendTimeHistoryStatus { kNotAdded, kOk, kDuplicate }; + + struct SsrcAndRtpSequencenumber { + uint32_t ssrc; + uint16_t rtp_sequence_number; + + bool operator<(const SsrcAndRtpSequencenumber& other) const { + return std::tie(ssrc, rtp_sequence_number) < + std::tie(other.ssrc, other.rtp_sequence_number); + } + }; + + std::optional RetrievePacketFeedback( + int64_t transport_seq_num, bool received); + std::optional RetrievePacketFeedback( + const SsrcAndRtpSequencenumber& key, bool received); + std::optional ToTransportFeedback( + std::vector packet_results, int64_t feedback_receive_time, + bool supports_ecn); + + int64_t pending_untracked_size_ = 0; + int64_t last_send_time_ = std::numeric_limits::min(); + int64_t last_untracked_send_time_ = std::numeric_limits::min(); + RtpSequenceNumberUnwrapper seq_num_unwrapper_; + + // Sequence numbers are never negative, using -1 as it always < a real + // sequence number. + int64_t last_ack_seq_num_ = -1; + InFlightBytesTracker in_flight_; + NetworkRoute network_route_; + + int64_t current_offset_ = std::numeric_limits::min(); + + // `last_transport_feedback_base_time` is only used for transport feedback to + // track base time. + int64_t last_transport_feedback_base_time_ = + std::numeric_limits::min(); + // Used by RFC 8888 congestion control feedback to track base time. + std::optional last_feedback_compact_ntp_time_; + + // Map SSRC and RTP sequence number to transport sequence number. + std::map + rtp_to_transport_sequence_number_; + std::map history_; +}; + +#endif \ No newline at end of file diff --git a/src/rtcp/rtcp_packet/rtcp_packet.h b/src/rtcp/rtcp_packet/rtcp_packet.h index 470fa05..0bf32e3 100644 --- a/src/rtcp/rtcp_packet/rtcp_packet.h +++ b/src/rtcp/rtcp_packet/rtcp_packet.h @@ -16,6 +16,7 @@ class RtcpPacket { public: + typedef enum { SR = 200, RR = 201, TCC = 205 } PAYLOAD_TYPE; // Callback used to signal that an RTCP packet is ready. Note that this may // not contain all data in this RtcpPacket; if a packet cannot fit in // max_length bytes, it will be fragmented and multiple calls to this diff --git a/src/rtcp/rtcp_packet/rtcp_packet_info.h b/src/rtcp/rtcp_packet/rtcp_packet_info.h new file mode 100644 index 0000000..c40213f --- /dev/null +++ b/src/rtcp/rtcp_packet/rtcp_packet_info.h @@ -0,0 +1,33 @@ +/* + * @Author: DI JUNKUN + * @Date: 2025-01-13 + * Copyright (c) 2025 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _PACKET_INFO_H_ +#define _PACKET_INFO_H_ + +#include +#include + +#include +#include +#include + +#include "congestion_control_feedback.h" + +struct RtcpPacketInfo { + uint32_t packet_type_flags = 0; // RTCPPacketTypeFlags bit field. + + uint32_t remote_ssrc = 0; + std::vector nack_sequence_numbers; + // std::vector report_block_datas; + std::optional rtt; + uint32_t receiver_estimated_max_bitrate_bps = 0; + std::optional congestion_control_feedback; + // std::optional target_bitrate_allocation; + // std::optional network_state_estimate; + // std::unique_ptr loss_notification; +}; + +#endif \ No newline at end of file diff --git a/src/transport/ice_transport.cpp b/src/transport/ice_transport.cpp index acca267..f6e7f3f 100644 --- a/src/transport/ice_transport.cpp +++ b/src/transport/ice_transport.cpp @@ -249,13 +249,120 @@ void IceTransport::OnReceiveBuffer(NiceAgent *agent, guint stream_id, data_channel_receive_->OnReceiveRtpPacket(buffer, size); } } else if (CheckIsRtcpPacket(buffer, size)) { - LOG_ERROR("Rtcp packet [{}]", (uint8_t)(buffer[1])); + // LOG_ERROR("Rtcp packet [{}]", (uint8_t)(buffer[1])); + RtcpPacketInfo rtcp_packet_info; + ParseRtcpPacket((const uint8_t *)buffer, size, &rtcp_packet_info); } else { LOG_ERROR("Unknown packet"); } } } +bool IceTransport::ParseRtcpPacket(const uint8_t *buffer, size_t size, + RtcpPacketInfo *rtcp_packet_info) { + RtcpCommonHeader rtcp_block; + // If a sender report is received but no DLRR, we need to reset the + // roundTripTime stat according to the standard, see + // https://www.w3.org/TR/webrtc-stats/#dom-rtcremoteoutboundrtpstreamstats-roundtriptime + struct RtcpReceivedBlock { + bool sender_report = false; + bool dlrr = false; + }; + // For each remote SSRC we store if we've received a sender report or a DLRR + // block. + bool valid = true; + if (!rtcp_block.Parse(buffer, size)) { + valid = false; + } + + switch (rtcp_block.type()) { + case RtcpPacket::PAYLOAD_TYPE::SR: + LOG_INFO("Sender report"); + // valid = HandleSenderReport(rtcp_block, rtcp_packet_info); + // received_blocks[rtcp_packet_info->remote_ssrc].sender_report = true; + break; + case RtcpPacket::PAYLOAD_TYPE::RR: + LOG_INFO("Receiver report"); + // valid = HandleReceiverReport(rtcp_block, rtcp_packet_info); + break; + case RtcpPacket::PAYLOAD_TYPE::TCC: + switch (rtcp_block.fmt()) { + case CongestionControlFeedback::kFeedbackMessageType: + LOG_INFO("Congestion Control Feedback"); + valid = HandleCongestionControlFeedback(rtcp_block, rtcp_packet_info); + break; + default: + break; + } + break; + // case rtcp::Psfb::kPacketType: + // switch (rtcp_block.fmt()) { + // case rtcp::Pli::kFeedbackMessageType: + // valid = HandlePli(rtcp_block, rtcp_packet_info); + // break; + // case rtcp::Fir::kFeedbackMessageType: + // valid = HandleFir(rtcp_block, rtcp_packet_info); + // break; + // case rtcp::Psfb::kAfbMessageType: + // HandlePsfbApp(rtcp_block, rtcp_packet_info); + // break; + // default: + // ++num_skipped_packets_; + // break; + // } + // break; + default: + break; + } + + // if (num_skipped_packets_ > 0) { + // const Timestamp now = env_.clock().CurrentTime(); + // if (now - last_skipped_packets_warning_ >= kMaxWarningLogInterval) { + // last_skipped_packets_warning_ = now; + // RTC_LOG(LS_WARNING) + // << num_skipped_packets_ + // << " RTCP blocks were skipped due to being malformed or of " + // "unrecognized/unsupported type, during the past " + // << kMaxWarningLogInterval << " period."; + // } + // } + + // if (!valid) { + // ++num_skipped_packets_; + // return false; + // } + + // for (const auto &rb : received_blocks) { + // if (rb.second.sender_report && !rb.second.dlrr) { + // auto rtt_stats = non_sender_rtts_.find(rb.first); + // if (rtt_stats != non_sender_rtts_.end()) { + // rtt_stats->second.Invalidate(); + // } + // } + // } + + // if (packet_type_counter_observer_) { + // packet_type_counter_observer_->RtcpPacketTypesCounterUpdated( + // local_media_ssrc(), packet_type_counter_); + // } + + return true; +} + +bool IceTransport::HandleCongestionControlFeedback( + const RtcpCommonHeader &rtcp_block, RtcpPacketInfo *rtcp_packet_info) { + CongestionControlFeedback feedback; + if (!feedback.Parse(rtcp_block) || feedback.packets().empty()) { + return false; + } + // uint32_t first_media_source_ssrc = feedback.packets()[0].ssrc; + // if (first_media_source_ssrc == local_media_ssrc() || + // registered_ssrcs_.contains(first_media_source_ssrc)) { + // rtcp_packet_info->congestion_control_feedback.emplace(std::move(feedback)); + // } + return true; +} + void IceTransport::OnReceiveCompleteFrame(VideoFrame &video_frame) { int num_frame_returned = video_decoder_->Decode( (uint8_t *)video_frame.Buffer(), video_frame.Size(), diff --git a/src/transport/ice_transport.h b/src/transport/ice_transport.h index c885d6e..9b9219e 100644 --- a/src/transport/ice_transport.h +++ b/src/transport/ice_transport.h @@ -19,6 +19,7 @@ #include "ice_agent.h" #include "io_statistics.h" #include "ringbuffer.h" +#include "rtcp_packet_info.h" #include "rtp_audio_receiver.h" #include "rtp_audio_sender.h" #include "rtp_codec.h" @@ -170,6 +171,12 @@ class IceTransport { void OnReceiveCompleteData(const char *data, size_t size); + bool ParseRtcpPacket(const uint8_t *buffer, size_t size, + RtcpPacketInfo *rtcp_packet_info); + + bool HandleCongestionControlFeedback(const RtcpCommonHeader &rtcp_block, + RtcpPacketInfo *rtcp_packet_info); + private: bool use_trickle_ice_ = true; bool enable_turn_ = false; diff --git a/xmake.lua b/xmake.lua index e8a83c4..da806ce 100644 --- a/xmake.lua +++ b/xmake.lua @@ -119,7 +119,7 @@ target("rtcp") target("qos") set_kind("object") - add_deps("log", "rtp") + add_deps("log", "rtp", "rtcp") add_files("src/qos/*.cpp") add_includedirs("src/qos", {public = true})