From d2b45b91e7819c04e3c8c3b179a7cbf787852acc Mon Sep 17 00:00:00 2001 From: dijunkun Date: Thu, 13 Mar 2025 21:11:20 +0800 Subject: [PATCH] [feat] move rtp packet sender out of channel module --- .../meida_channel/audio_channel_send.cpp | 4 +- .../meida_channel/data_channel_send.cpp | 4 +- .../meida_channel/video_channel_send.cpp | 20 +- .../meida_channel/video_channel_send.h | 8 + src/channel/rtp_channel/rtp_audio_sender.cpp | 14 +- src/channel/rtp_channel/rtp_audio_sender.h | 6 +- src/channel/rtp_channel/rtp_data_sender.cpp | 14 +- src/channel/rtp_channel/rtp_data_sender.h | 6 +- .../rtp_channel/rtp_video_receiver.cpp | 7 +- src/channel/rtp_channel/rtp_video_sender.cpp | 39 ++- src/channel/rtp_channel/rtp_video_sender.h | 12 +- src/qos/congestion_control.cpp | 21 +- src/qos/congestion_control.h | 2 + src/qos/pacing_controller.cc | 6 +- src/qos/probe_controller.cc | 40 ++- src/qos/send_side_bandwidth_estimation.cc | 3 +- src/ringbuffer/ringbuffer.h | 25 +- src/rtp/rtp_packet/rtp_packet_history.cpp | 8 +- src/rtp/rtp_packet/rtp_packet_history.h | 12 +- src/rtp/rtp_packetizer/rtp_packetizer.h | 6 +- src/rtp/rtp_packetizer/rtp_packetizer_av1.cpp | 4 +- src/rtp/rtp_packetizer/rtp_packetizer_av1.h | 8 +- .../rtp_packetizer/rtp_packetizer_generic.cpp | 8 +- .../rtp_packetizer/rtp_packetizer_generic.h | 8 +- .../rtp_packetizer/rtp_packetizer_h264.cpp | 81 ++++- src/rtp/rtp_packetizer/rtp_packetizer_h264.h | 10 +- src/transport/ice_transport.cpp | 22 +- src/transport/ice_transport.h | 1 + src/transport/ice_transport_controller.cpp | 36 +- src/transport/ice_transport_controller.h | 2 + src/transport/packet_sender.cpp | 314 ++++++++++++------ src/transport/packet_sender.h | 196 ++++++++--- 32 files changed, 681 insertions(+), 266 deletions(-) diff --git a/src/channel/meida_channel/audio_channel_send.cpp b/src/channel/meida_channel/audio_channel_send.cpp index 1136feb..0e6052a 100644 --- a/src/channel/meida_channel/audio_channel_send.cpp +++ b/src/channel/meida_channel/audio_channel_send.cpp @@ -44,9 +44,9 @@ void AudioChannelSend::Destroy() { int AudioChannelSend::SendAudio(char *data, size_t size) { if (rtp_audio_sender_ && rtp_packetizer_) { - std::vector> rtp_packets = + std::vector> rtp_packets = rtp_packetizer_->Build((uint8_t *)data, (uint32_t)size, 0, true); - rtp_audio_sender_->Enqueue(rtp_packets); + rtp_audio_sender_->Enqueue(std::move(rtp_packets)); } return 0; diff --git a/src/channel/meida_channel/data_channel_send.cpp b/src/channel/meida_channel/data_channel_send.cpp index 581307a..571dbae 100644 --- a/src/channel/meida_channel/data_channel_send.cpp +++ b/src/channel/meida_channel/data_channel_send.cpp @@ -44,9 +44,9 @@ void DataChannelSend::Destroy() { int DataChannelSend::SendData(const char *data, size_t size) { if (rtp_data_sender_ && rtp_packetizer_) { - std::vector> rtp_packets = + std::vector> rtp_packets = rtp_packetizer_->Build((uint8_t *)data, (uint32_t)size, 0, true); - rtp_data_sender_->Enqueue(rtp_packets); + rtp_data_sender_->Enqueue(std::move(rtp_packets)); } return 0; diff --git a/src/channel/meida_channel/video_channel_send.cpp b/src/channel/meida_channel/video_channel_send.cpp index 7e9b094..96095cf 100644 --- a/src/channel/meida_channel/video_channel_send.cpp +++ b/src/channel/meida_channel/video_channel_send.cpp @@ -48,6 +48,21 @@ void VideoChannelSend::Initialize(rtp::PAYLOAD_TYPE payload_type) { rtp_video_sender_->Start(); } +void VideoChannelSend::SetEnqueuePacketsFunc( + std::function>&)> + enqueue_packets_func) { + rtp_video_sender_->SetEnqueuePacketsFunc(enqueue_packets_func); +} + +std::vector> VideoChannelSend::GeneratePadding( + uint32_t payload_size, int64_t capture_timestamp_ms) { + if (rtp_packetizer_) { + return rtp_packetizer_->BuildPadding(payload_size, capture_timestamp_ms, + true); + } + return std::vector>{}; +} + void VideoChannelSend::Destroy() { if (rtp_video_sender_) { rtp_video_sender_->Stop(); @@ -57,11 +72,12 @@ void VideoChannelSend::Destroy() { int VideoChannelSend::SendVideo( std::shared_ptr encoded_frame) { if (rtp_video_sender_ && rtp_packetizer_) { - std::vector> rtp_packets = + std::vector> rtp_packets = rtp_packetizer_->Build((uint8_t*)encoded_frame->Buffer(), (uint32_t)encoded_frame->Size(), encoded_frame->CaptureTimestamp(), true); - rtp_video_sender_->Enqueue(rtp_packets, encoded_frame->CaptureTimestamp()); + rtp_video_sender_->Enqueue(std::move(rtp_packets), + encoded_frame->CaptureTimestamp()); } return 0; diff --git a/src/channel/meida_channel/video_channel_send.h b/src/channel/meida_channel/video_channel_send.h index 92278dc..f3334d5 100644 --- a/src/channel/meida_channel/video_channel_send.h +++ b/src/channel/meida_channel/video_channel_send.h @@ -28,6 +28,14 @@ class VideoChannelSend { on_sent_packet_func_); ~VideoChannelSend(); + void SetEnqueuePacketsFunc( + std::function< + void(std::vector>&)> + enqueue_packets_func); + + std::vector> GeneratePadding( + uint32_t payload_size, int64_t capture_timestamp_ms); + public: void Initialize(rtp::PAYLOAD_TYPE payload_type); void Destroy(); diff --git a/src/channel/rtp_channel/rtp_audio_sender.cpp b/src/channel/rtp_channel/rtp_audio_sender.cpp index 8e22a67..e323c3e 100644 --- a/src/channel/rtp_channel/rtp_audio_sender.cpp +++ b/src/channel/rtp_channel/rtp_audio_sender.cpp @@ -23,14 +23,14 @@ RtpAudioSender::~RtpAudioSender() { } void RtpAudioSender::Enqueue( - std::vector> rtp_packets) { + std::vector> rtp_packets) { if (!rtp_statistics_) { rtp_statistics_ = std::make_unique(); rtp_statistics_->Start(); } for (auto& rtp_packet : rtp_packets) { - rtp_packet_queue_.push(rtp_packet); + rtp_packet_queue_.push(std::move(rtp_packet)); } } @@ -39,7 +39,7 @@ void RtpAudioSender::SetSendDataFunc( data_send_func_ = data_send_func; } -int RtpAudioSender::SendRtpPacket(std::shared_ptr rtp_packet) { +int RtpAudioSender::SendRtpPacket(std::unique_ptr rtp_packet) { if (!data_send_func_) { LOG_ERROR("data_send_func_ is nullptr"); return -1; @@ -141,9 +141,11 @@ bool RtpAudioSender::Process() { for (size_t i = 0; i < 10; i++) if (!rtp_packet_queue_.isEmpty()) { - std::shared_ptr rtp_packet; - rtp_packet_queue_.pop(rtp_packet); - SendRtpPacket(rtp_packet); + std::optional> rtp_packet = + rtp_packet_queue_.pop(); + if (rtp_packet) { + SendRtpPacket(std::move(*rtp_packet)); + } } if (rtp_statistics_) { diff --git a/src/channel/rtp_channel/rtp_audio_sender.h b/src/channel/rtp_channel/rtp_audio_sender.h index a57bd11..d2c6e0f 100644 --- a/src/channel/rtp_channel/rtp_audio_sender.h +++ b/src/channel/rtp_channel/rtp_audio_sender.h @@ -24,13 +24,13 @@ class RtpAudioSender : public ThreadBase { virtual ~RtpAudioSender(); public: - void Enqueue(std::vector> rtp_packets); + void Enqueue(std::vector> rtp_packets); void SetSendDataFunc(std::function data_send_func); uint32_t GetSsrc() { return ssrc_; } void OnReceiverReport(const ReceiverReport &receiver_report) {} private: - int SendRtpPacket(std::shared_ptr rtp_packet); + int SendRtpPacket(std::unique_ptr rtp_packet); int SendRtcpSR(SenderReport &rtcp_sr); bool CheckIsTimeSendSR(); @@ -40,7 +40,7 @@ class RtpAudioSender : public ThreadBase { private: std::function data_send_func_ = nullptr; - RingBuffer> rtp_packet_queue_; + RingBuffer> rtp_packet_queue_; private: uint32_t ssrc_ = 0; diff --git a/src/channel/rtp_channel/rtp_data_sender.cpp b/src/channel/rtp_channel/rtp_data_sender.cpp index a5c48e4..3d92137 100644 --- a/src/channel/rtp_channel/rtp_data_sender.cpp +++ b/src/channel/rtp_channel/rtp_data_sender.cpp @@ -23,14 +23,14 @@ RtpDataSender::~RtpDataSender() { } void RtpDataSender::Enqueue( - std::vector> rtp_packets) { + std::vector> rtp_packets) { if (!rtp_statistics_) { rtp_statistics_ = std::make_unique(); rtp_statistics_->Start(); } for (auto& rtp_packet : rtp_packets) { - rtp_packet_queue_.push(rtp_packet); + rtp_packet_queue_.push(std::move(rtp_packet)); } } @@ -39,7 +39,7 @@ void RtpDataSender::SetSendDataFunc( data_send_func_ = data_send_func; } -int RtpDataSender::SendRtpPacket(std::shared_ptr rtp_packet) { +int RtpDataSender::SendRtpPacket(std::unique_ptr rtp_packet) { if (!data_send_func_) { LOG_ERROR("data_send_func_ is nullptr"); return -1; @@ -141,9 +141,11 @@ bool RtpDataSender::Process() { for (size_t i = 0; i < 10; i++) if (!rtp_packet_queue_.isEmpty()) { - std::shared_ptr rtp_packet; - rtp_packet_queue_.pop(rtp_packet); - SendRtpPacket(rtp_packet); + std::optional> rtp_packet = + rtp_packet_queue_.pop(); + if (rtp_packet) { + SendRtpPacket(std::move(*rtp_packet)); + } } if (rtp_statistics_) { diff --git a/src/channel/rtp_channel/rtp_data_sender.h b/src/channel/rtp_channel/rtp_data_sender.h index b629a05..bdd9f90 100644 --- a/src/channel/rtp_channel/rtp_data_sender.h +++ b/src/channel/rtp_channel/rtp_data_sender.h @@ -24,14 +24,14 @@ class RtpDataSender : public ThreadBase { virtual ~RtpDataSender(); public: - void Enqueue(std::vector> rtp_packets); + void Enqueue(std::vector> rtp_packets); void SetSendDataFunc(std::function data_send_func); uint32_t GetSsrc() { return ssrc_; } void OnReceiverReport(const ReceiverReport &receiver_report) {} private: private: - int SendRtpPacket(std::shared_ptr rtp_packet); + int SendRtpPacket(std::unique_ptr rtp_packet); int SendRtcpSR(SenderReport &rtcp_sr); bool CheckIsTimeSendSR(); @@ -41,7 +41,7 @@ class RtpDataSender : public ThreadBase { private: std::function data_send_func_ = nullptr; - RingBuffer> rtp_packet_queue_; + RingBuffer> rtp_packet_queue_; private: uint32_t ssrc_ = 0; diff --git a/src/channel/rtp_channel/rtp_video_receiver.cpp b/src/channel/rtp_channel/rtp_video_receiver.cpp index 0d29923..3d8f20c 100644 --- a/src/channel/rtp_channel/rtp_video_receiver.cpp +++ b/src/channel/rtp_channel/rtp_video_receiver.cpp @@ -532,9 +532,8 @@ bool RtpVideoReceiver::CheckIsTimeSendRR() { bool RtpVideoReceiver::Process() { if (!compelete_video_frame_queue_.isEmpty()) { - VideoFrame video_frame; - compelete_video_frame_queue_.pop(video_frame); - if (on_receive_complete_frame_) { + std::optional video_frame = compelete_video_frame_queue_.pop(); + if (on_receive_complete_frame_ && video_frame) { // auto now_complete_frame_ts = // std::chrono::duration_cast( // std::chrono::system_clock::now().time_since_epoch()) @@ -543,7 +542,7 @@ bool RtpVideoReceiver::Process() { // LOG_ERROR("Duration {}", duration); // last_complete_frame_ts_ = now_complete_frame_ts; - on_receive_complete_frame_(video_frame); + on_receive_complete_frame_(*video_frame); // #ifdef SAVE_RTP_RECV_STREAM // fwrite((unsigned char*)video_frame.Buffer(), 1, // video_frame.Size(), diff --git a/src/channel/rtp_channel/rtp_video_sender.cpp b/src/channel/rtp_channel/rtp_video_sender.cpp index 67c9604..001c19b 100644 --- a/src/channel/rtp_channel/rtp_video_sender.cpp +++ b/src/channel/rtp_channel/rtp_video_sender.cpp @@ -44,22 +44,26 @@ RtpVideoSender::~RtpVideoSender() { } void RtpVideoSender::Enqueue( - std::vector>& rtp_packets, + std::vector>& rtp_packets, int64_t capture_timestamp_ms) { if (!rtp_statistics_) { rtp_statistics_ = std::make_unique(); rtp_statistics_->Start(); } + std::vector> to_send_rtp_packets; for (auto& rtp_packet : rtp_packets) { - std::shared_ptr rtp_packet_to_send = - std::dynamic_pointer_cast(rtp_packet); + std::unique_ptr rtp_packet_to_send( + static_cast(rtp_packet.release())); rtp_packet_to_send->set_capture_time( webrtc::Timestamp::Millis(capture_timestamp_ms)); rtp_packet_to_send->set_transport_sequence_number(transport_seq_++); rtp_packet_to_send->set_packet_type(webrtc::RtpPacketMediaType::kVideo); - rtp_packet_queue_.push(std::move(rtp_packet_to_send)); + // rtp_packet_queue_.push(std::move(rtp_packet_to_send)); + + to_send_rtp_packets.push_back(std::move(rtp_packet_to_send)); } + enqueue_packets_func_(std::move(to_send_rtp_packets)); } void RtpVideoSender::SetSendDataFunc( @@ -72,18 +76,19 @@ void RtpVideoSender::SetOnSentPacketFunc( on_sent_packet_func_ = on_sent_packet_func; } +void RtpVideoSender::SetEnqueuePacketsFunc( + std::function>&)> + enqueue_packets_func) { + enqueue_packets_func_ = enqueue_packets_func; +} + int RtpVideoSender::SendRtpPacket( - std::shared_ptr rtp_packet_to_send) { + std::unique_ptr rtp_packet_to_send) { if (!data_send_func_) { LOG_ERROR("data_send_func_ is nullptr"); return -1; } - if (on_sent_packet_func_) { - on_sent_packet_func_(*rtp_packet_to_send); - rtp_packet_history_->AddPacket(rtp_packet_to_send, clock_->CurrentTime()); - } - last_rtp_timestamp_ = rtp_packet_to_send->capture_time().ms(); int ret = data_send_func_((const char*)rtp_packet_to_send->Buffer().data(), @@ -124,6 +129,12 @@ int RtpVideoSender::SendRtpPacket( SendRtcpSR(rtcp_sr); } + if (on_sent_packet_func_) { + on_sent_packet_func_(*rtp_packet_to_send); + rtp_packet_history_->AddPacket(std::move(rtp_packet_to_send), + clock_->CurrentTime()); + } + return 0; } @@ -164,10 +175,10 @@ bool RtpVideoSender::Process() { for (size_t i = 0; i < 10; i++) if (!rtp_packet_queue_.isEmpty()) { - std::shared_ptr rtp_packet_to_send; - pop_success = rtp_packet_queue_.pop(rtp_packet_to_send); - if (pop_success) { - SendRtpPacket(rtp_packet_to_send); + std::optional> + rtp_packet_to_send = rtp_packet_queue_.pop(); + if (rtp_packet_to_send) { + SendRtpPacket(std::move(*rtp_packet_to_send)); } } diff --git a/src/channel/rtp_channel/rtp_video_sender.h b/src/channel/rtp_channel/rtp_video_sender.h index 2a171c6..12a50ab 100644 --- a/src/channel/rtp_channel/rtp_video_sender.h +++ b/src/channel/rtp_channel/rtp_video_sender.h @@ -23,17 +23,21 @@ class RtpVideoSender : public ThreadBase { virtual ~RtpVideoSender(); public: - void Enqueue(std::vector> &rtp_packets, + void Enqueue(std::vector> &rtp_packets, int64_t capture_timestamp_ms); void SetSendDataFunc(std::function data_send_func); void SetOnSentPacketFunc( std::function on_sent_packet_func); + void SetEnqueuePacketsFunc( + std::function< + void(std::vector> &)> + enqueue_packets_func); uint32_t GetSsrc() { return ssrc_; } void OnReceiverReport(const ReceiverReport &receiver_report); private: int SendRtpPacket( - std::shared_ptr rtp_packet_to_send); + std::unique_ptr rtp_packet_to_send); int SendRtcpSR(SenderReport &rtcp_sr); bool CheckIsTimeSendSR(); @@ -45,7 +49,9 @@ class RtpVideoSender : public ThreadBase { std::function data_send_func_ = nullptr; std::function on_sent_packet_func_ = nullptr; - RingBuffer> rtp_packet_queue_; + std::function> &)> + enqueue_packets_func_ = nullptr; + RingBuffer> rtp_packet_queue_; private: uint32_t ssrc_ = 0; diff --git a/src/qos/congestion_control.cpp b/src/qos/congestion_control.cpp index 1e092cb..80adbdb 100644 --- a/src/qos/congestion_control.cpp +++ b/src/qos/congestion_control.cpp @@ -26,23 +26,30 @@ BandwidthLimitedCause GetBandwidthLimitedCause(LossBasedState loss_based_state, BandwidthUsage bandwidth_usage) { if (bandwidth_usage == BandwidthUsage::kBwOverusing || bandwidth_usage == BandwidthUsage::kBwUnderusing) { + LOG_ERROR("kDelayBasedLimitedDelayIncreased"); return BandwidthLimitedCause::kDelayBasedLimitedDelayIncreased; } else if (is_rtt_above_limit) { + LOG_ERROR("kDelayBasedLimitedDelayIncreased"); return BandwidthLimitedCause::kRttBasedBackOffHighRtt; } switch (loss_based_state) { case LossBasedState::kDecreasing: // Probes may not be sent in this state. + LOG_ERROR("kLossLimitedBwe"); return BandwidthLimitedCause::kLossLimitedBwe; case webrtc::LossBasedState::kIncreaseUsingPadding: // Probes may not be sent in this state. + LOG_ERROR("kLossLimitedBwe"); return BandwidthLimitedCause::kLossLimitedBwe; case LossBasedState::kIncreasing: + LOG_ERROR("kLossLimitedBweIncreasing"); // Probes may be sent in this state. return BandwidthLimitedCause::kLossLimitedBweIncreasing; case LossBasedState::kDelayBasedEstimate: + // LOG_ERROR("kDelayBasedLimited"); return BandwidthLimitedCause::kDelayBasedLimited; default: + LOG_ERROR("kLossLimitedBwe"); return BandwidthLimitedCause::kLossLimitedBwe; } } @@ -87,6 +94,13 @@ CongestionControl::CongestionControl() CongestionControl::~CongestionControl() {} +NetworkControlUpdate CongestionControl::OnNetworkAvailability( + NetworkAvailability msg) { + NetworkControlUpdate update; + update.probe_cluster_configs = probe_controller_->OnNetworkAvailability(msg); + return update; +} + NetworkControlUpdate CongestionControl::OnProcessInterval(ProcessInterval msg) { NetworkControlUpdate update; if (initial_config_) { @@ -122,7 +136,6 @@ NetworkControlUpdate CongestionControl::OnProcessInterval(ProcessInterval msg) { auto probes = probe_controller_->Process(msg.at_time); update.probe_cluster_configs.insert(update.probe_cluster_configs.end(), probes.begin(), probes.end()); - update.congestion_window = current_data_window_; MaybeTriggerOnNetworkChanged(&update, msg.at_time); @@ -407,8 +420,10 @@ void CongestionControl::MaybeTriggerOnNetworkChanged( update->probe_cluster_configs.insert(update->probe_cluster_configs.end(), probes.begin(), probes.end()); update->pacer_config = GetPacingRates(at_time); - // LOG_INFO("bwe {} pushback_target_bps={} estimate_bps={}", at_time.ms(), - // last_pushback_target_rate_.bps(), loss_based_target_rate.bps()); + // LOG_INFO("bwe {} pushback_target_bps={} estimate_bps={}", + // at_time.ms(), + // last_pushback_target_rate_.bps(), + // loss_based_target_rate.bps()); } } diff --git a/src/qos/congestion_control.h b/src/qos/congestion_control.h index 39223f6..ea5f8e8 100644 --- a/src/qos/congestion_control.h +++ b/src/qos/congestion_control.h @@ -23,6 +23,8 @@ class CongestionControl { ~CongestionControl(); public: + NetworkControlUpdate OnNetworkAvailability(NetworkAvailability msg); + NetworkControlUpdate OnProcessInterval(ProcessInterval msg); NetworkControlUpdate OnTransportLossReport(TransportLossReport msg); diff --git a/src/qos/pacing_controller.cc b/src/qos/pacing_controller.cc index 5156684..5a105c9 100644 --- a/src/qos/pacing_controller.cc +++ b/src/qos/pacing_controller.cc @@ -397,6 +397,8 @@ void PacingController::ProcessPackets() { if (now + early_execute_margin < target_send_time) { // We are too early, but if queue is empty still allow draining some debt. // Probing is allowed to be sent up to kMinSleepTime early. + LOG_ERROR("!!!!!!! too early, target_send_time {}, now {}, {}", + target_send_time.ms(), now.ms(), early_execute_margin.ms()); UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(now)); return; } @@ -664,8 +666,8 @@ void PacingController::MaybeUpdateMediaRateDueToLongQueue(Timestamp now) { DataRate min_rate_needed = queue_size_data / avg_time_left; if (min_rate_needed > pacing_rate_) { adjusted_media_rate_ = min_rate_needed; - LOG_INFO("bwe:large_pacing_queue pacing_rate_kbps={}", - pacing_rate_.kbps()); + // LOG_INFO("bwe:large_pacing_queue pacing_rate_kbps={}", + // pacing_rate_.kbps()); } } } diff --git a/src/qos/probe_controller.cc b/src/qos/probe_controller.cc index 0e43a5e..af814c4 100644 --- a/src/qos/probe_controller.cc +++ b/src/qos/probe_controller.cc @@ -108,6 +108,7 @@ std::vector ProbeController::SetBitrates( if (start_bitrate > DataRate::Zero()) { start_bitrate_ = start_bitrate; estimated_bitrate_ = start_bitrate; + LOG_WARN("1 setting estimated_bitrate_ = {}", estimated_bitrate_.bps()); } else if (start_bitrate_.IsZero()) { start_bitrate_ = min_bitrate; } @@ -117,10 +118,11 @@ std::vector ProbeController::SetBitrates( DataRate old_max_bitrate = max_bitrate_; max_bitrate_ = max_bitrate.IsFinite() ? max_bitrate : kDefaultMaxProbingBitrate; - switch (state_) { case State::kInit: - if (network_available_) return InitiateExponentialProbing(at_time); + if (network_available_) { + return InitiateExponentialProbing(at_time); + } break; case State::kWaitingForProbingResult: @@ -131,6 +133,7 @@ std::vector ProbeController::SetBitrates( // estimate then initiate probing. if (!estimated_bitrate_.IsZero() && old_max_bitrate < max_bitrate_ && estimated_bitrate_ < max_bitrate_) { + LOG_WARN("probing complete"); return InitiateProbing(at_time, {max_bitrate_}, false); } break; @@ -150,8 +153,9 @@ std::vector ProbeController::OnMaxTotalAllocatedBitrate( allow_allocation_probe) { max_total_allocated_bitrate_ = max_total_allocated_bitrate; - if (!config_.first_allocation_probe_scale) + if (!config_.first_allocation_probe_scale) { return std::vector(); + } DataRate first_probe_rate = max_total_allocated_bitrate * config_.first_allocation_probe_scale; @@ -174,7 +178,7 @@ std::vector ProbeController::OnMaxTotalAllocatedBitrate( probes.push_back(second_probe_rate); } bool allow_further_probing = limited_by_current_bwe; - + LOG_WARN("allow_further_probing {}", allow_further_probing); return InitiateProbing(at_time, probes, allow_further_probing); } if (!max_total_allocated_bitrate.IsZero()) { @@ -228,12 +232,10 @@ std::vector ProbeController::InitiateExponentialProbing( max_total_allocated_bitrate_.IsZero()) { last_allowed_repeated_initial_probe_ = at_time + config_.repeated_initial_probing_time_period; - // LOG_INFO("Repeated initial probing enabled, last allowed probe: {} now: - // {}", - // ToString(last_allowed_repeated_initial_probe_), - // ToString(at_time)); + LOG_INFO("Repeated initial probing enabled, last allowed probe: {} now: {}", + last_allowed_repeated_initial_probe_.ms(), at_time.ms()); } - + LOG_WARN("InitiateExponentialProbing"); return InitiateProbing(at_time, probes, true); } @@ -246,7 +248,6 @@ std::vector ProbeController::SetEstimatedBitrate( bitrate_before_last_large_drop_ = estimated_bitrate_; } estimated_bitrate_ = bitrate; - if (state_ == State::kWaitingForProbingResult) { // Continue probing if probing results indicate channel has greater // capacity unless we already reached the needed bitrate. @@ -264,13 +265,14 @@ std::vector ProbeController::SetEstimatedBitrate( ? network_estimate_->link_capacity_upper * config_.further_probe_threshold : DataRate::PlusInfinity(); - // LOG_INFO( - // "Measured bitrate: {} Minimum to probe further: {} upper limit: {}", - // bitrate, ToString(min_bitrate_to_probe_further_), - // ToString(network_state_estimate_probe_further_limit)); + LOG_INFO( + "Measured bitrate: {} Minimum to probe further: {} upper limit: {}", + bitrate.bps(), min_bitrate_to_probe_further_.bps(), + network_state_estimate_probe_further_limit.bps()); if (bitrate > min_bitrate_to_probe_further_ && bitrate <= network_state_estimate_probe_further_limit) { + LOG_WARN("InitiateProbing SetEstimatedBitrate"); return InitiateProbing( at_time, {config_.further_exponential_probe_scale * bitrate}, true); } @@ -323,6 +325,7 @@ std::vector ProbeController::RequestProbe( time_since_probe > kMinTimeBetweenAlrProbes) { LOG_INFO("Detected big bandwidth drop, start probing"); last_bwe_drop_probing_time_ = at_time; + LOG_WARN("InitiateProbing RequestProbe"); return InitiateProbing(at_time, {suggested_probe}, false); } } @@ -341,6 +344,7 @@ void ProbeController::Reset(Timestamp at_time) { min_bitrate_to_probe_further_ = DataRate::PlusInfinity(); time_last_probing_initiated_ = Timestamp::Zero(); estimated_bitrate_ = DataRate::Zero(); + LOG_WARN("3 setting estimated_bitrate_ = {}", estimated_bitrate_.bps()); network_estimate_ = std::nullopt; start_bitrate_ = DataRate::Zero(); max_bitrate_ = kDefaultMaxProbingBitrate; @@ -413,15 +417,21 @@ std::vector ProbeController::Process(Timestamp at_time) { UpdateState(State::kProbingComplete); } } + if (estimated_bitrate_.IsZero() || state_ != State::kProbingComplete) { return {}; } if (TimeForNextRepeatedInitialProbe(at_time)) { + LOG_WARN( + "InitiateProbing TimeForNextRepeatedInitialProbe, estimated_bitrate_ = " + "{}", + estimated_bitrate_.bps()); return InitiateProbing( at_time, {estimated_bitrate_ * config_.first_exponential_probe_scale}, true); } if (TimeForAlrProbe(at_time) || TimeForNetworkStateProbe(at_time)) { + LOG_WARN("InitiateProbing TimeForNetworkStateProbe"); return InitiateProbing( at_time, {estimated_bitrate_ * config_.alr_probe_scale}, true); } @@ -505,7 +515,6 @@ std::vector ProbeController::InitiateProbing( if (config_.network_state_estimate_probing_interval.IsFinite() && network_estimate_ && network_estimate_->link_capacity_upper.IsFinite()) { if (network_estimate_->link_capacity_upper.IsZero()) { - LOG_INFO("Not sending probe, Network state estimate is zero"); return {}; } max_probe_bitrate = std::min( @@ -522,6 +531,7 @@ std::vector ProbeController::InitiateProbing( } pending_probes.push_back(CreateProbeClusterConfig(now, bitrate)); } + LOG_ERROR("2 pending probes size {}", pending_probes.size()); time_last_probing_initiated_ = now; if (probe_further) { UpdateState(State::kWaitingForProbingResult); diff --git a/src/qos/send_side_bandwidth_estimation.cc b/src/qos/send_side_bandwidth_estimation.cc index 6275a4d..ec0fd08 100644 --- a/src/qos/send_side_bandwidth_estimation.cc +++ b/src/qos/send_side_bandwidth_estimation.cc @@ -163,6 +163,7 @@ SendSideBandwidthEstimation::SendSideBandwidthEstimation() low_loss_threshold_(kDefaultLowLossThreshold), high_loss_threshold_(kDefaultHighLossThreshold), bitrate_threshold_(kDefaultBitrateThreshold), + loss_based_state_(LossBasedState::kDelayBasedEstimate), disable_receiver_limit_caps_only_(false) { // rtt_backoff_ = } @@ -407,7 +408,7 @@ void SendSideBandwidthEstimation::UpdateEstimate(Timestamp at_time) { // it would take over one second since the lower packet loss to achieve // 108kbps. DataRate new_bitrate = DataRate::BitsPerSec( - min_bitrate_history_.front().second.bps() * 1.5 + 0.5); + min_bitrate_history_.front().second.bps() * 1.08 + 0.5); // Add 1 kbps extra, just to make sure that we do not get stuck // (gives a little extra increase at low rates, negligible at higher diff --git a/src/ringbuffer/ringbuffer.h b/src/ringbuffer/ringbuffer.h index fcd207a..f07be2c 100644 --- a/src/ringbuffer/ringbuffer.h +++ b/src/ringbuffer/ringbuffer.h @@ -4,6 +4,7 @@ #include #include #include +#include int RingBufferDummy(); @@ -62,37 +63,25 @@ class RingBuffer { bool isFull() const { return m_front == (m_rear + 1) % m_size; } - bool push(const T& value) { + bool push(T value) { if (isFull()) { return false; } if (!m_data) { return false; } - m_data[m_rear] = value; + m_data[m_rear] = std::move(value); m_rear = (m_rear + 1) % m_size; return true; } - bool push(const T* value) { - if (isFull()) { - return false; - } - if (!m_data) { - return false; - } - m_data[m_rear] = *value; - m_rear = (m_rear + 1) % m_size; - return true; - } - - bool pop(T& value) { + std::optional pop() { if (isEmpty()) { - return false; + return std::nullopt; } - value = m_data[m_front]; + std::optional value = std::move(m_data[m_front]); m_front = (m_front + 1) % m_size; - return true; + return value; } unsigned int front() const { return m_front; } diff --git a/src/rtp/rtp_packet/rtp_packet_history.cpp b/src/rtp/rtp_packet/rtp_packet_history.cpp index f9722a9..df48ae4 100644 --- a/src/rtp/rtp_packet/rtp_packet_history.cpp +++ b/src/rtp/rtp_packet/rtp_packet_history.cpp @@ -17,7 +17,7 @@ void RtpPacketHistory::SetRtt(webrtc::TimeDelta rtt) { } void RtpPacketHistory::AddPacket( - std::shared_ptr rtp_packet, + std::unique_ptr rtp_packet, webrtc::Timestamp send_time) { RemoveDeadPackets(); const uint16_t rtp_seq_no = rtp_packet->SequenceNumber(); @@ -40,7 +40,7 @@ void RtpPacketHistory::AddPacket( rtp_packet_history_.emplace_back(); } - rtp_packet_history_[packet_index] = {rtp_packet, send_time, + rtp_packet_history_[packet_index] = {std::move(rtp_packet), send_time, packets_inserted_++}; } @@ -79,10 +79,10 @@ void RtpPacketHistory::RemoveDeadPackets() { } } -std::shared_ptr RtpPacketHistory::RemovePacket( +std::unique_ptr RtpPacketHistory::RemovePacket( int packet_index) { // Move the packet out from the StoredPacket container. - std::shared_ptr rtp_packet = + std::unique_ptr rtp_packet = std::move(rtp_packet_history_[packet_index].rtp_packet); if (packet_index == 0) { while (!rtp_packet_history_.empty() && diff --git a/src/rtp/rtp_packet/rtp_packet_history.h b/src/rtp/rtp_packet/rtp_packet_history.h index 6de6e3d..9537693 100644 --- a/src/rtp/rtp_packet/rtp_packet_history.h +++ b/src/rtp/rtp_packet/rtp_packet_history.h @@ -31,25 +31,27 @@ class RtpPacketHistory { public: void SetRtt(webrtc::TimeDelta rtt); - void AddPacket(std::shared_ptr rtp_packet, + void AddPacket(std::unique_ptr rtp_packet, webrtc::Timestamp send_time); void RemoveDeadPackets(); private: - std::shared_ptr RemovePacket(int packet_index); + std::unique_ptr RemovePacket(int packet_index); int GetPacketIndex(uint16_t sequence_number) const; private: struct RtpPacketToSendInfo { RtpPacketToSendInfo() = default; - RtpPacketToSendInfo(std::shared_ptr rtp_packet, + RtpPacketToSendInfo(std::unique_ptr rtp_packet, webrtc::Timestamp send_time, uint64_t index) - : rtp_packet(rtp_packet), send_time(send_time), index(index) {} + : rtp_packet(std::move(rtp_packet)), + send_time(send_time), + index(index) {} RtpPacketToSendInfo(RtpPacketToSendInfo&&) = default; RtpPacketToSendInfo& operator=(RtpPacketToSendInfo&&) = default; ~RtpPacketToSendInfo() = default; - std::shared_ptr rtp_packet; + std::unique_ptr rtp_packet; webrtc::Timestamp send_time = webrtc::Timestamp::Zero(); uint64_t index; }; diff --git a/src/rtp/rtp_packetizer/rtp_packetizer.h b/src/rtp/rtp_packetizer/rtp_packetizer.h index 582a53f..7911afb 100644 --- a/src/rtp/rtp_packetizer/rtp_packetizer.h +++ b/src/rtp/rtp_packetizer/rtp_packetizer.h @@ -21,9 +21,13 @@ class RtpPacketizer { virtual ~RtpPacketizer() = default; - virtual std::vector> Build( + virtual std::vector> Build( uint8_t* payload, uint32_t payload_size, int64_t capture_timestamp_ms, bool use_rtp_packet_to_send) = 0; + + virtual std::vector> BuildPadding( + uint32_t payload_size, int64_t capture_timestamp_ms, + bool use_rtp_packet_to_send) = 0; }; #endif \ No newline at end of file diff --git a/src/rtp/rtp_packetizer/rtp_packetizer_av1.cpp b/src/rtp/rtp_packetizer/rtp_packetizer_av1.cpp index 373d1c6..6b97162 100644 --- a/src/rtp/rtp_packetizer/rtp_packetizer_av1.cpp +++ b/src/rtp/rtp_packetizer/rtp_packetizer_av1.cpp @@ -4,10 +4,10 @@ RtpPacketizerAv1::RtpPacketizerAv1(uint32_t ssrc) {} RtpPacketizerAv1::~RtpPacketizerAv1() {} -std::vector> RtpPacketizerAv1::Build( +std::vector> RtpPacketizerAv1::Build( uint8_t* payload, uint32_t payload_size, int64_t capture_timestamp_ms, bool use_rtp_packet_to_send) { - std::vector> rtp_packets; + std::vector> rtp_packets; return rtp_packets; } diff --git a/src/rtp/rtp_packetizer/rtp_packetizer_av1.h b/src/rtp/rtp_packetizer/rtp_packetizer_av1.h index 2d6c4bb..4c21ad9 100644 --- a/src/rtp/rtp_packetizer/rtp_packetizer_av1.h +++ b/src/rtp/rtp_packetizer/rtp_packetizer_av1.h @@ -15,10 +15,16 @@ class RtpPacketizerAv1 : public RtpPacketizer { virtual ~RtpPacketizerAv1(); - std::vector> Build( + std::vector> Build( uint8_t* payload, uint32_t payload_size, int64_t capture_timestamp_ms, bool use_rtp_packet_to_send) override; + std::vector> BuildPadding( + uint32_t payload_size, int64_t capture_timestamp_ms, + bool use_rtp_packet_to_send) override { + return std::vector>{}; + }; + private: uint8_t version_; bool has_padding_; diff --git a/src/rtp/rtp_packetizer/rtp_packetizer_generic.cpp b/src/rtp/rtp_packetizer/rtp_packetizer_generic.cpp index 6bfae8f..8060dd0 100644 --- a/src/rtp/rtp_packetizer/rtp_packetizer_generic.cpp +++ b/src/rtp/rtp_packetizer/rtp_packetizer_generic.cpp @@ -46,7 +46,7 @@ void RtpPacketizerGeneric::AddAbsSendTimeExtension( rtp_packet_frame.push_back(abs_send_time & 0xFF); } -std::vector> RtpPacketizerGeneric::Build( +std::vector> RtpPacketizerGeneric::Build( uint8_t* payload, uint32_t payload_size, int64_t capture_timestamp_ms, bool use_rtp_packet_to_send) { uint32_t last_packet_size = payload_size % MAX_NALU_LEN; @@ -58,7 +58,7 @@ std::vector> RtpPacketizerGeneric::Build( std::chrono::system_clock::now().time_since_epoch()) .count(); - std::vector> rtp_packets; + std::vector> rtp_packets; for (uint32_t index = 0; index < packet_num; index++) { version_ = kRtpVersion; @@ -109,12 +109,12 @@ std::vector> RtpPacketizerGeneric::Build( } if (use_rtp_packet_to_send) { - std::shared_ptr rtp_packet = + std::unique_ptr rtp_packet = std::make_unique(); rtp_packet->Build(rtp_packet_frame_.data(), rtp_packet_frame_.size()); rtp_packets.emplace_back(std::move(rtp_packet)); } else { - std::shared_ptr rtp_packet = std::make_unique(); + std::unique_ptr rtp_packet = std::make_unique(); rtp_packet->Build(rtp_packet_frame_.data(), rtp_packet_frame_.size()); rtp_packets.emplace_back(std::move(rtp_packet)); } diff --git a/src/rtp/rtp_packetizer/rtp_packetizer_generic.h b/src/rtp/rtp_packetizer/rtp_packetizer_generic.h index 5f19f56..fed0cdb 100644 --- a/src/rtp/rtp_packetizer/rtp_packetizer_generic.h +++ b/src/rtp/rtp_packetizer/rtp_packetizer_generic.h @@ -15,10 +15,16 @@ class RtpPacketizerGeneric : public RtpPacketizer { virtual ~RtpPacketizerGeneric(); - std::vector> Build( + std::vector> Build( uint8_t* payload, uint32_t payload_size, int64_t capture_timestamp_ms, bool use_rtp_packet_to_send) override; + std::vector> BuildPadding( + uint32_t payload_size, int64_t capture_timestamp_ms, + bool use_rtp_packet_to_send) override { + return std::vector>{}; + }; + private: void AddAbsSendTimeExtension(std::vector& rtp_packet_frame); diff --git a/src/rtp/rtp_packetizer/rtp_packetizer_h264.cpp b/src/rtp/rtp_packetizer/rtp_packetizer_h264.cpp index 7b61b4d..fe1cd30 100644 --- a/src/rtp/rtp_packetizer/rtp_packetizer_h264.cpp +++ b/src/rtp/rtp_packetizer/rtp_packetizer_h264.cpp @@ -60,7 +60,7 @@ void RtpPacketizerH264::AddAbsSendTimeExtension( rtp_packet_frame.push_back(abs_send_time & 0xFF); } -std::vector> RtpPacketizerH264::Build( +std::vector> RtpPacketizerH264::Build( uint8_t* payload, uint32_t payload_size, int64_t capture_timestamp_ms, bool use_rtp_packet_to_send) { if (payload_size <= MAX_NALU_LEN) { @@ -72,10 +72,10 @@ std::vector> RtpPacketizerH264::Build( } } -std::vector> RtpPacketizerH264::BuildNalu( +std::vector> RtpPacketizerH264::BuildNalu( uint8_t* payload, uint32_t payload_size, int64_t capture_timestamp_ms, bool use_rtp_packet_to_send) { - std::vector> rtp_packets; + std::vector> rtp_packets; version_ = kRtpVersion; has_padding_ = false; @@ -128,12 +128,12 @@ std::vector> RtpPacketizerH264::BuildNalu( payload + payload_size); if (use_rtp_packet_to_send) { - std::shared_ptr rtp_packet = - std::make_shared(); + std::unique_ptr rtp_packet = + std::make_unique(); rtp_packet->Build(rtp_packet_frame_.data(), rtp_packet_frame_.size()); rtp_packets.emplace_back(std::move(rtp_packet)); } else { - std::shared_ptr rtp_packet = std::make_shared(); + std::unique_ptr rtp_packet = std::make_unique(); rtp_packet->Build(rtp_packet_frame_.data(), rtp_packet_frame_.size()); rtp_packets.emplace_back(std::move(rtp_packet)); } @@ -141,10 +141,10 @@ std::vector> RtpPacketizerH264::BuildNalu( return rtp_packets; } -std::vector> RtpPacketizerH264::BuildFua( +std::vector> RtpPacketizerH264::BuildFua( uint8_t* payload, uint32_t payload_size, int64_t capture_timestamp_ms, bool use_rtp_packet_to_send) { - std::vector> rtp_packets; + std::vector> rtp_packets; uint32_t last_packet_size = payload_size % MAX_NALU_LEN; uint32_t packet_num = @@ -227,12 +227,12 @@ std::vector> RtpPacketizerH264::BuildFua( } if (use_rtp_packet_to_send) { - std::shared_ptr rtp_packet = - std::make_shared(); + std::unique_ptr rtp_packet = + std::make_unique(); rtp_packet->Build(rtp_packet_frame_.data(), rtp_packet_frame_.size()); rtp_packets.emplace_back(std::move(rtp_packet)); } else { - std::shared_ptr rtp_packet = std::make_shared(); + std::unique_ptr rtp_packet = std::make_unique(); rtp_packet->Build(rtp_packet_frame_.data(), rtp_packet_frame_.size()); rtp_packets.emplace_back(std::move(rtp_packet)); } @@ -241,6 +241,65 @@ std::vector> RtpPacketizerH264::BuildFua( return rtp_packets; } +std::vector> RtpPacketizerH264::BuildPadding( + uint32_t payload_size, int64_t capture_timestamp_ms, + bool use_rtp_packet_to_send) { + std::vector> rtp_packets; + + version_ = kRtpVersion; + has_padding_ = true; + has_extension_ = true; + csrc_count_ = 0; + marker_ = 0; + uint8_t payload_type = rtp::PAYLOAD_TYPE(payload_type_ - 1); + sequence_number_++; + timestamp_ = kMsToRtpTimestamp * static_cast(capture_timestamp_ms); + + rtp_packet_frame_.clear(); + rtp_packet_frame_.push_back((version_ << 6) | (has_padding_ << 5) | + (has_extension_ << 4) | csrc_count_); + rtp_packet_frame_.push_back((marker_ << 7) | payload_type); + rtp_packet_frame_.push_back((sequence_number_ >> 8) & 0xFF); + rtp_packet_frame_.push_back(sequence_number_ & 0xFF); + rtp_packet_frame_.push_back((timestamp_ >> 24) & 0xFF); + rtp_packet_frame_.push_back((timestamp_ >> 16) & 0xFF); + rtp_packet_frame_.push_back((timestamp_ >> 8) & 0xFF); + rtp_packet_frame_.push_back(timestamp_ & 0xFF); + rtp_packet_frame_.push_back((ssrc_ >> 24) & 0xFF); + rtp_packet_frame_.push_back((ssrc_ >> 16) & 0xFF); + rtp_packet_frame_.push_back((ssrc_ >> 8) & 0xFF); + rtp_packet_frame_.push_back(ssrc_ & 0xFF); + + for (uint32_t index = 0; index < csrc_count_ && !csrcs_.empty(); index++) { + rtp_packet_frame_.push_back((csrcs_[index] >> 24) & 0xFF); + rtp_packet_frame_.push_back((csrcs_[index] >> 16) & 0xFF); + rtp_packet_frame_.push_back((csrcs_[index] >> 8) & 0xFF); + rtp_packet_frame_.push_back(csrcs_[index] & 0xFF); + } + + if (has_extension_) { + AddAbsSendTimeExtension(rtp_packet_frame_); + } + + // Add padding bytes + uint32_t padding_size = payload_size; + rtp_packet_frame_.insert(rtp_packet_frame_.end(), padding_size - 1, 0); + rtp_packet_frame_.push_back(padding_size); + + if (use_rtp_packet_to_send) { + std::unique_ptr rtp_packet = + std::make_unique(); + rtp_packet->Build(rtp_packet_frame_.data(), rtp_packet_frame_.size()); + rtp_packets.emplace_back(std::move(rtp_packet)); + } else { + std::unique_ptr rtp_packet = std::make_unique(); + rtp_packet->Build(rtp_packet_frame_.data(), rtp_packet_frame_.size()); + rtp_packets.emplace_back(std::move(rtp_packet)); + } + + return rtp_packets; +} + // bool BuildFec(uint8_t* payload, uint32_t payload_size) { // uint8_t** fec_packets = // fec_encoder_.Encode((const char*)payload, payload_size); diff --git a/src/rtp/rtp_packetizer/rtp_packetizer_h264.h b/src/rtp/rtp_packetizer/rtp_packetizer_h264.h index 6b0f452..00fccc4 100644 --- a/src/rtp/rtp_packetizer/rtp_packetizer_h264.h +++ b/src/rtp/rtp_packetizer/rtp_packetizer_h264.h @@ -15,19 +15,23 @@ class RtpPacketizerH264 : public RtpPacketizer { virtual ~RtpPacketizerH264(); - std::vector> Build( + std::vector> Build( uint8_t* payload, uint32_t payload_size, int64_t capture_timestamp_ms, bool use_rtp_packet_to_send) override; - std::vector> BuildNalu( + std::vector> BuildNalu( uint8_t* payload, uint32_t payload_size, int64_t capture_timestamp_ms, bool use_rtp_packet_to_send); - std::vector> BuildFua(uint8_t* payload, + std::vector> BuildFua(uint8_t* payload, uint32_t payload_size, int64_t capture_timestamp_ms, bool use_rtp_packet_to_send); + std::vector> BuildPadding( + uint32_t payload_size, int64_t capture_timestamp_ms, + bool use_rtp_packet_to_send) override; + private: bool EncodeH264Fua(RtpPacket& rtp_packet, uint8_t* payload, size_t payload_size); diff --git a/src/transport/ice_transport.cpp b/src/transport/ice_transport.cpp index 7d0bc19..82f18f1 100644 --- a/src/transport/ice_transport.cpp +++ b/src/transport/ice_transport.cpp @@ -110,6 +110,9 @@ void IceTransport::OnIceStateChange(NiceAgent *agent, guint stream_id, if (state == NICE_COMPONENT_STATE_READY || state == NICE_COMPONENT_STATE_CONNECTED) { ice_io_statistics_->Start(); + if (ice_transport_controller_) { + ice_transport_controller_->UpdateNetworkAvaliablity(true); + } } on_ice_status_change_(nice_component_state_to_string(state), @@ -202,6 +205,8 @@ void IceTransport::OnReceiveBuffer(NiceAgent *agent, guint stream_id, // LOG_ERROR("Rtcp packet [{}]", (uint8_t)(buffer[1])); RtcpPacketInfo rtcp_packet_info; ParseRtcpPacket((const uint8_t *)buffer, size, &rtcp_packet_info); + } else if (CheckIsRtpPaddingPacket(buffer, size)) { + // LOG_WARN("Rtp padding packet"); } else { LOG_ERROR("Unknown packet"); } @@ -920,7 +925,22 @@ uint8_t IceTransport::CheckIsRtpPacket(const char *buffer, size_t size) { } uint8_t payload_type = buffer[1] & 0x7F; - if (payload_type >= 96 && payload_type <= 127) { + if (payload_type == 96 || payload_type == 99 || payload_type == 111 || + payload_type == 127) { + return payload_type; + } else { + return 0; + } +} + +uint8_t IceTransport::CheckIsRtpPaddingPacket(const char *buffer, size_t size) { + if (size < 2) { + return 0; + } + + uint8_t payload_type = buffer[1] & 0x7F; + if (payload_type == 95 || payload_type == 98 || payload_type == 110 || + payload_type == 126) { return payload_type; } else { return 0; diff --git a/src/transport/ice_transport.h b/src/transport/ice_transport.h index 658b6db..202d434 100644 --- a/src/transport/ice_transport.h +++ b/src/transport/ice_transport.h @@ -105,6 +105,7 @@ class IceTransport { private: uint8_t CheckIsRtpPacket(const char *buffer, size_t size); + uint8_t CheckIsRtpPaddingPacket(const char *buffer, size_t size); uint8_t CheckIsRtcpPacket(const char *buffer, size_t size); uint8_t CheckIsVideoPacket(const char *buffer, size_t size); uint8_t CheckIsAudioPacket(const char *buffer, size_t size); diff --git a/src/transport/ice_transport_controller.cpp b/src/transport/ice_transport_controller.cpp index 7e97b6a..2ea0190 100644 --- a/src/transport/ice_transport_controller.cpp +++ b/src/transport/ice_transport_controller.cpp @@ -42,6 +42,7 @@ void IceTransportController::Create( std::shared_ptr ice_io_statistics, OnReceiveVideo on_receive_video, OnReceiveAudio on_receive_audio, OnReceiveData on_receive_data, void* user_data) { + ice_agent_ = ice_agent; remote_user_id_ = remote_user_id; on_receive_video_ = on_receive_video; on_receive_audio_ = on_receive_audio; @@ -53,6 +54,16 @@ void IceTransportController::Create( controller_ = std::make_unique(); packet_sender_ = std::make_unique(ice_agent, webrtc_clock_); + packet_sender_->SetPacingRates(DataRate::BitsPerSec(300000), + DataRate::Zero()); + packet_sender_->SetOnSentPacketFunc( + [this](const webrtc::RtpPacketToSend& packet) { + if (ice_agent_) { + ice_agent_->Send((const char*)packet.Buffer().data(), packet.Size()); + OnSentRtpPacket(packet); + } + }); + resolution_adapter_ = std::make_unique(); video_channel_send_ = std::make_unique( @@ -60,6 +71,13 @@ void IceTransportController::Create( [this](const webrtc::RtpPacketToSend& packet) { OnSentRtpPacket(packet); }); + + packet_sender_->SetGeneratePaddingFunc( + [this](uint32_t size, int64_t capture_timestamp_ms) + -> std::vector> { + return video_channel_send_->GeneratePadding(size, capture_timestamp_ms); + }); + audio_channel_send_ = std::make_unique(ice_agent, ice_io_statistics); data_channel_send_ = @@ -69,6 +87,10 @@ void IceTransportController::Create( audio_channel_send_->Initialize(rtp::PAYLOAD_TYPE::OPUS); data_channel_send_->Initialize(rtp::PAYLOAD_TYPE::DATA); + video_channel_send_->SetEnqueuePacketsFunc( + [this](std::vector>& packets) + -> void { packet_sender_->EnqueuePackets(std::move(packets)); }); + std::weak_ptr weak_self = shared_from_this(); video_channel_receive_ = std::make_unique( clock_, ice_agent, ice_io_statistics, @@ -161,6 +183,7 @@ int IceTransportController::SendVideo(const XVideoFrame* video_frame) { [this](std::shared_ptr encoded_frame) -> int { if (video_channel_send_) { video_channel_send_->SendVideo(encoded_frame); + LOG_WARN("SendVideo rtp packets"); } return 0; @@ -205,6 +228,17 @@ int IceTransportController::SendData(const char* data, size_t size) { return 0; } +void IceTransportController::UpdateNetworkAvaliablity(bool network_available) { + if (controller_) { + webrtc::NetworkAvailability msg; + msg.at_time = + webrtc::Timestamp::Millis(webrtc_clock_->TimeInMilliseconds()); + msg.network_available = network_available; + controller_->OnNetworkAvailability(msg); + packet_sender_->EnsureStarted(); + } +} + int IceTransportController::OnReceiveVideoRtpPacket(const char* data, size_t size) { if (video_channel_receive_) { @@ -486,7 +520,7 @@ void IceTransportController::PostUpdates(webrtc::NetworkControlUpdate update) { target_height_.reset(); } video_encoder_->SetTargetBitrate(target_bitrate_); - LOG_WARN("Set target bitrate [{}]bps", target_bitrate_); + // LOG_WARN("Set target bitrate [{}]bps", target_bitrate_); } } diff --git a/src/transport/ice_transport_controller.h b/src/transport/ice_transport_controller.h index a8b2290..5318cd9 100644 --- a/src/transport/ice_transport_controller.h +++ b/src/transport/ice_transport_controller.h @@ -58,6 +58,8 @@ class IceTransportController void FullIntraRequest() { b_force_i_frame_ = true; } + void UpdateNetworkAvaliablity(bool network_available); + int OnReceiveVideoRtpPacket(const char *data, size_t size); int OnReceiveAudioRtpPacket(const char *data, size_t size); int OnReceiveDataRtpPacket(const char *data, size_t size); diff --git a/src/transport/packet_sender.cpp b/src/transport/packet_sender.cpp index 01d767c..8a4561e 100644 --- a/src/transport/packet_sender.cpp +++ b/src/transport/packet_sender.cpp @@ -3,116 +3,244 @@ #include "log.h" +const int PacketSender::kNoPacketHoldback = -1; + PacketSender::PacketSender(std::shared_ptr ice_agent, std::shared_ptr clock) : ice_agent_(ice_agent), clock_(clock), - pacing_controller_(clock.get(), this) {} + pacing_controller_(clock.get(), this), + max_hold_back_window_(webrtc::TimeDelta::Millis(5)), + max_hold_back_window_in_packets_(3), + next_process_time_(webrtc::Timestamp::MinusInfinity()), + is_started_(false), + is_shutdown_(false), + packet_size_(/*alpha=*/0.95), + include_overhead_(false) {} PacketSender::~PacketSender() {} -// int PacketSender::SendPacket(const char *data, size_t size) { -// LOG_INFO("Send packet, size: %d", size); -// return ice_agent_->Send(data, size); -// } +std::vector> +PacketSender::GeneratePadding(webrtc::DataSize size) { + std::vector> to_send_rtp_packets; + std::vector> rtp_packets = + generat_padding_func_(size.bytes(), clock_->CurrentTime().ms()); + // for (auto &packet : rtp_packets) { + // std::unique_ptr rtp_packet_to_send( + // static_cast(packet.release())); + // to_send_rtp_packets.push_back(std::move(rtp_packet_to_send)); + // } -// void PacketSender::CreateProbeClusters( -// std::vector probe_cluster_configs) { -// pacing_controller_.CreateProbeClusters(probe_cluster_configs); -// MaybeScheduleProcessPackets(); -// } + return to_send_rtp_packets; +} -// void PacketSender::MaybeScheduleProcessPackets() { -// if (!processing_packets_) -// MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); -// } +void PacketSender::SetSendBurstInterval(webrtc::TimeDelta burst_interval) { + pacing_controller_.SetSendBurstInterval(burst_interval); +} -// void PacketSender::MaybeProcessPackets( -// webrtc::Timestamp scheduled_process_time) { -// if (is_shutdown_ || !is_started_) { -// return; -// } +void PacketSender::SetAllowProbeWithoutMediaPacket(bool allow) { + pacing_controller_.SetAllowProbeWithoutMediaPacket(allow); +} -// // Protects against re-entry from transport feedback calling into the task -// // queue pacer. -// processing_packets_ = true; -// auto cleanup = std::unique_ptr>( -// nullptr, [this](void *) { processing_packets_ = false; }); +void PacketSender::EnsureStarted() { + is_started_ = true; + MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); +} -// webrtc::Timestamp next_send_time = pacing_controller_.NextSendTime(); -// const webrtc::Timestamp now = clock_->CurrentTime(); -// webrtc::TimeDelta early_execute_margin = -// pacing_controller_.IsProbing() -// ? webrtc::PacingController::kMaxEarlyProbeProcessing -// : webrtc::TimeDelta::Zero(); +void PacketSender::Pause() { pacing_controller_.Pause(); } -// // Process packets and update stats. -// while (next_send_time <= now + early_execute_margin) { -// pacing_controller_.ProcessPackets(); -// next_send_time = pacing_controller_.NextSendTime(); +void PacketSender::Resume() { + pacing_controller_.Resume(); + MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); +} -// // Probing state could change. Get margin after process packets. -// early_execute_margin = -// pacing_controller_.IsProbing() -// ? webrtc::PacingController::kMaxEarlyProbeProcessing -// : webrtc::TimeDelta::Zero(); -// } -// UpdateStats(); +void PacketSender::SetCongested(bool congested) { + pacing_controller_.SetCongested(congested); + MaybeScheduleProcessPackets(); +} -// // Ignore retired scheduled task, otherwise reset `next_process_time_`. -// if (scheduled_process_time.IsFinite()) { -// if (scheduled_process_time != next_process_time_) { -// return; -// } -// next_process_time_ = webrtc::Timestamp::MinusInfinity(); -// } +void PacketSender::SetPacingRates(webrtc::DataRate pacing_rate, + webrtc::DataRate padding_rate) { + pacing_controller_.SetPacingRates(pacing_rate, padding_rate); + MaybeScheduleProcessPackets(); +} -// // Do not hold back in probing. -// webrtc::TimeDelta hold_back_window = webrtc::TimeDelta::Zero(); -// if (!pacing_controller_.IsProbing()) { -// hold_back_window = max_hold_back_window_; -// webrtc::DataRate pacing_rate = pacing_controller_.pacing_rate(); -// if (max_hold_back_window_in_packets_ != kNoPacketHoldback && -// !pacing_rate.IsZero() && -// packet_size_.filtered() != rtc::ExpFilter::kValueUndefined) { -// webrtc::TimeDelta avg_packet_send_time = -// webrtc::DataSize::Bytes(packet_size_.filtered()) / pacing_rate; -// hold_back_window = -// std::min(hold_back_window, -// avg_packet_send_time * max_hold_back_window_in_packets_); -// } -// } +void PacketSender::EnqueuePackets( + std::vector> packets) { + // task_queue_->PostTask() + for (auto &packet : packets) { + size_t packet_size = packet->payload_size() + packet->padding_size(); + if (include_overhead_) { + packet_size += packet->headers_size(); + } + packet_size_.Apply(1, packet_size); + pacing_controller_.EnqueuePacket(std::move(packet)); + } + MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); +} -// // Calculate next process time. -// webrtc::TimeDelta time_to_next_process = -// std::max(hold_back_window, next_send_time - now - -// early_execute_margin); -// next_send_time = now + time_to_next_process; +void PacketSender::RemovePacketsForSsrc(uint32_t ssrc) { + // task_queue_->PostTask(SafeTask(safety_.flag(), [this, ssrc] { + pacing_controller_.RemovePacketsForSsrc(ssrc); + MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); + // })); +} -// // If no in flight task or in flight task is later than `next_send_time`, -// // schedule a new one. Previous in flight task will be retired. -// if (next_process_time_.IsMinusInfinity() || -// next_process_time_ > next_send_time) { -// // Prefer low precision if allowed and not probing. -// task_queue_->PostDelayedHighPrecisionTask( -// SafeTask( -// safety_.flag(), -// [this, next_send_time]() { MaybeProcessPackets(next_send_time); -// }), -// time_to_next_process.RoundUpTo(webrtc::TimeDelta::Millis(1))); -// next_process_time_ = next_send_time; -// } -// } +void PacketSender::SetAccountForAudioPackets(bool account_for_audio) { + pacing_controller_.SetAccountForAudioPackets(account_for_audio); + MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); +} -// void PacketSender::UpdateStats() { -// Stats new_stats; -// new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime(); -// new_stats.first_sent_packet_time = -// pacing_controller_.FirstSentPacketTime(); -// new_stats.oldest_packet_enqueue_time = -// pacing_controller_.OldestPacketEnqueueTime(); -// new_stats.queue_size = pacing_controller_.QueueSizeData(); -// OnStatsUpdated(new_stats); -// } +void PacketSender::SetIncludeOverhead() { + include_overhead_ = true; + pacing_controller_.SetIncludeOverhead(); + MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); +} -// PacketSender::Stats PacketSender::GetStats() const { return current_stats_; } \ No newline at end of file +void PacketSender::SetTransportOverhead(webrtc::DataSize overhead_per_packet) { + pacing_controller_.SetTransportOverhead(overhead_per_packet); + MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); +} + +void PacketSender::SetQueueTimeLimit(webrtc::TimeDelta limit) { + pacing_controller_.SetQueueTimeLimit(limit); + MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); +} + +webrtc::TimeDelta PacketSender::ExpectedQueueTime() const { + return GetStats().expected_queue_time; +} + +webrtc::DataSize PacketSender::QueueSizeData() const { + return GetStats().queue_size; +} + +std::optional PacketSender::FirstSentPacketTime() const { + return GetStats().first_sent_packet_time; +} + +webrtc::TimeDelta PacketSender::OldestPacketWaitTime() const { + webrtc::Timestamp oldest_packet = GetStats().oldest_packet_enqueue_time; + if (oldest_packet.IsInfinite()) { + return webrtc::TimeDelta::Zero(); + } + + // (webrtc:9716): The clock is not always monotonic. + webrtc::Timestamp current = clock_->CurrentTime(); + if (current < oldest_packet) { + return webrtc::TimeDelta::Zero(); + } + + return current - oldest_packet; +} + +void PacketSender::CreateProbeClusters( + std::vector probe_cluster_configs) { + pacing_controller_.CreateProbeClusters(probe_cluster_configs); + MaybeScheduleProcessPackets(); +} + +void PacketSender::OnStatsUpdated(const Stats &stats) { + current_stats_ = stats; +} + +void PacketSender::MaybeScheduleProcessPackets() { + LOG_ERROR("x1"); + if (!processing_packets_) { + LOG_ERROR("x2"); + MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); + } +} + +void PacketSender::MaybeProcessPackets( + webrtc::Timestamp scheduled_process_time) { + if (is_shutdown_ || !is_started_) { + LOG_ERROR("shutdown {}, started {}", is_shutdown_, is_started_); + return; + } + + // Protects against re-entry from transport feedback calling into the task + // queue pacer. + processing_packets_ = true; + // auto cleanup = std::unique_ptr>( + // nullptr, [this](void *) { processing_packets_ = false; }); + + webrtc::Timestamp next_send_time = pacing_controller_.NextSendTime(); + const webrtc::Timestamp now = clock_->CurrentTime(); + webrtc::TimeDelta early_execute_margin = + pacing_controller_.IsProbing() + ? webrtc::PacingController::kMaxEarlyProbeProcessing + : webrtc::TimeDelta::Zero(); + + // Process packets and update stats. + while (next_send_time <= now + early_execute_margin) { + pacing_controller_.ProcessPackets(); + next_send_time = pacing_controller_.NextSendTime(); + + // Probing state could change. Get margin after process packets. + early_execute_margin = + pacing_controller_.IsProbing() + ? webrtc::PacingController::kMaxEarlyProbeProcessing + : webrtc::TimeDelta::Zero(); + } + + UpdateStats(); + + // Ignore retired scheduled task, otherwise reset `next_process_time_`. + if (scheduled_process_time.IsFinite()) { + if (scheduled_process_time != next_process_time_) { + return; + } + next_process_time_ = webrtc::Timestamp::MinusInfinity(); + } + + // Do not hold back in probing. + webrtc::TimeDelta hold_back_window = webrtc::TimeDelta::Zero(); + if (!pacing_controller_.IsProbing()) { + hold_back_window = max_hold_back_window_; + webrtc::DataRate pacing_rate = pacing_controller_.pacing_rate(); + if (max_hold_back_window_in_packets_ != kNoPacketHoldback && + !pacing_rate.IsZero() && + packet_size_.filtered() != rtc::ExpFilter::kValueUndefined) { + webrtc::TimeDelta avg_packet_send_time = + webrtc::DataSize::Bytes(packet_size_.filtered()) / pacing_rate; + hold_back_window = + std::min(hold_back_window, + avg_packet_send_time * max_hold_back_window_in_packets_); + } + } + + // Calculate next process time. + webrtc::TimeDelta time_to_next_process = + std::max(hold_back_window, next_send_time - now - early_execute_margin); + next_send_time = now + time_to_next_process; + + // If no in flight task or in flight task is later than `next_send_time`, + // schedule a new one. Previous in flight task will be retired. + if (next_process_time_.IsMinusInfinity() || + next_process_time_ > next_send_time) { + // Prefer low precision if allowed and not probing. + // task_queue_->PostDelayedHighPrecisionTask( + // SafeTask( + // safety_.flag(), + // [this, next_send_time]() { MaybeProcessPackets(next_send_time); + // }), + MaybeProcessPackets(next_send_time); + time_to_next_process.RoundUpTo(webrtc::TimeDelta::Millis(1)); + next_process_time_ = next_send_time; + } + + processing_packets_ = false; +} + +void PacketSender::UpdateStats() { + Stats new_stats; + new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime(); + new_stats.first_sent_packet_time = pacing_controller_.FirstSentPacketTime(); + new_stats.oldest_packet_enqueue_time = + pacing_controller_.OldestPacketEnqueueTime(); + new_stats.queue_size = pacing_controller_.QueueSizeData(); + OnStatsUpdated(new_stats); +} + +PacketSender::Stats PacketSender::GetStats() const { return current_stats_; } \ No newline at end of file diff --git a/src/transport/packet_sender.h b/src/transport/packet_sender.h index 1291357..69761ba 100644 --- a/src/transport/packet_sender.h +++ b/src/transport/packet_sender.h @@ -16,6 +16,7 @@ #include "api/units/time_delta.h" #include "api/units/timestamp.h" #include "ice_agent.h" +#include "log.h" #include "pacing_controller.h" #include "rtc_base/numerics/exp_filter.h" #include "rtp_packet_pacer.h" @@ -24,74 +25,38 @@ class PacketSender : public webrtc::RtpPacketPacer, public webrtc::PacingController::PacketSender { public: + static const int kNoPacketHoldback; + PacketSender(std::shared_ptr ice_agent, std::shared_ptr clock); ~PacketSender(); - int SendPacket(const char* data, size_t size); - - public: - void CreateProbeClusters( - std::vector probe_cluster_configs) override{}; - - // Temporarily pause all sending. - void Pause() override{}; - - // Resume sending packets. - void Resume() override{}; - - void SetCongested(bool congested) override{}; - - // Sets the pacing rates. Must be called once before packets can be sent. - void SetPacingRates(webrtc::DataRate pacing_rate, - webrtc::DataRate padding_rate) override{}; - - // Time since the oldest packet currently in the queue was added. - webrtc::TimeDelta OldestPacketWaitTime() const override { - return webrtc::TimeDelta::Zero(); - }; - - // Sum of payload + padding bytes of all packets currently in the pacer queue. - webrtc::DataSize QueueSizeData() const override { - return webrtc::DataSize::Zero(); - }; - - // Returns the time when the first packet was sent. - std::optional FirstSentPacketTime() const override { - return {}; + void SetOnSentPacketFunc( + std::function on_sent_packet_func) { + on_sent_packet_func_ = on_sent_packet_func; } - // Returns the expected number of milliseconds it will take to send the - // current packets in the queue, given the current size and bitrate, ignoring - // priority. - webrtc::TimeDelta ExpectedQueueTime() const override { - return webrtc::TimeDelta::Zero(); - }; - - // Set the average upper bound on pacer queuing delay. The pacer may send at - // a higher rate than what was configured via SetPacingRates() in order to - // keep ExpectedQueueTimeMs() below `limit_ms` on average. - void SetQueueTimeLimit(webrtc::TimeDelta limit) override{}; - - // Currently audio traffic is not accounted by pacer and passed through. - // With the introduction of audio BWE audio traffic will be accounted for - // the pacer budget calculation. The audio traffic still will be injected - // at high priority. - void SetAccountForAudioPackets(bool account_for_audio) override{}; - void SetIncludeOverhead() override{}; - void SetTransportOverhead(webrtc::DataSize overhead_per_packet) override{}; + void SetGeneratePaddingFunc( + std::function>(uint32_t, int64_t)> + generat_padding_func) { + generat_padding_func_ = generat_padding_func; + } public: void SendPacket(std::unique_ptr packet, - const webrtc::PacedPacketInfo& cluster_info) override {} + const webrtc::PacedPacketInfo& cluster_info) override { + if (on_sent_packet_func_) { + on_sent_packet_func_(*packet); + } + } // Should be called after each call to SendPacket(). std::vector> FetchFec() override { - return {}; + std::vector> fec_packets; + return fec_packets; } std::vector> GeneratePadding( - webrtc::DataSize size) override { - return {}; - } + webrtc::DataSize size) override; + // TODO(bugs.webrtc.org/1439830): Make pure once subclasses adapt. void OnBatchComplete() override {} @@ -105,12 +70,133 @@ class PacketSender : public webrtc::RtpPacketPacer, return std::nullopt; } + public: + void SetSendBurstInterval(webrtc::TimeDelta burst_interval); + + // A probe may be sent without first waing for a media packet. + void SetAllowProbeWithoutMediaPacket(bool allow); + + // Ensure that necessary delayed tasks are scheduled. + void EnsureStarted(); + + // Methods implementing RtpPacketSender. + + // Adds the packet to the queue and calls + // PacingController::PacketSender::SendPacket() when it's time to send. + void EnqueuePackets( + std::vector> packets); + // Remove any pending packets matching this SSRC from the packet queue. + void RemovePacketsForSsrc(uint32_t ssrc); + + void CreateProbeClusters( + std::vector probe_cluster_configs) override; + + // Temporarily pause all sending. + void Pause() override; + + // Resume sending packets. + void Resume() override; + + void SetCongested(bool congested) override; + + // Sets the pacing rates. Must be called once before packets can be sent. + void SetPacingRates(webrtc::DataRate pacing_rate, + webrtc::DataRate padding_rate) override; + + // Currently audio traffic is not accounted for by pacer and passed through. + // With the introduction of audio BWE, audio traffic will be accounted for + // in the pacer budget calculation. The audio traffic will still be injected + // at high priority. + void SetAccountForAudioPackets(bool account_for_audio) override; + + void SetIncludeOverhead() override; + void SetTransportOverhead(webrtc::DataSize overhead_per_packet) override; + + // Time since the oldest packet currently in the queue was added. + webrtc::TimeDelta OldestPacketWaitTime() const override; + + // Sum of payload + padding bytes of all packets currently in the pacer queue. + webrtc::DataSize QueueSizeData() const override; + + // Returns the time when the first packet was sent. + std::optional FirstSentPacketTime() const override; + + // Returns the expected number of milliseconds it will take to send the + // current packets in the queue, given the current size and bitrate, ignoring + // priority. + webrtc::TimeDelta ExpectedQueueTime() const override; + + // Set the average upper bound on pacer queuing delay. The pacer may send at + // a higher rate than what was configured via SetPacingRates() in order to + // keep ExpectedQueueTimeMs() below `limit_ms` on average. + void SetQueueTimeLimit(webrtc::TimeDelta limit) override; + + protected: + // Exposed as protected for test. + struct Stats { + Stats() + : oldest_packet_enqueue_time(webrtc::Timestamp::MinusInfinity()), + queue_size(webrtc::DataSize::Zero()), + expected_queue_time(webrtc::TimeDelta::Zero()) {} + webrtc::Timestamp oldest_packet_enqueue_time; + webrtc::DataSize queue_size; + webrtc::TimeDelta expected_queue_time; + std::optional first_sent_packet_time; + }; + void OnStatsUpdated(const Stats& stats); + + private: + // Call in response to state updates that could warrant sending out packets. + // Protected against re-entry from packet sent receipts. + void MaybeScheduleProcessPackets(); + // Check if it is time to send packets, or schedule a delayed task if not. + // Use Timestamp::MinusInfinity() to indicate that this call has _not_ + // been scheduled by the pacing controller. If this is the case, check if we + // can execute immediately otherwise schedule a delay task that calls this + // method again with desired (finite) scheduled process time. + void MaybeProcessPackets(webrtc::Timestamp scheduled_process_time); + + void UpdateStats(); + Stats GetStats() const; + private: std::shared_ptr ice_agent_ = nullptr; webrtc::PacingController pacing_controller_; + std::function on_sent_packet_func_ = + nullptr; + + std::function>(uint32_t, int64_t)> + generat_padding_func_ = nullptr; private: std::shared_ptr clock_ = nullptr; + + private: + const webrtc::TimeDelta max_hold_back_window_; + const int max_hold_back_window_in_packets_; + // We want only one (valid) delayed process task in flight at a time. + // If the value of `next_process_time_` is finite, it is an id for a + // delayed task that will call MaybeProcessPackets() with that time + // as parameter. + // Timestamp::MinusInfinity() indicates no valid pending task. + webrtc::Timestamp next_process_time_; + + // Indicates if this task queue is started. If not, don't allow + // posting delayed tasks yet. + bool is_started_; + + // Indicates if this task queue is shutting down. If so, don't allow + // posting any more delayed tasks as that can cause the task queue to + // never drain. + bool is_shutdown_; + + // Filtered size of enqueued packets, in bytes. + rtc::ExpFilter packet_size_; + bool include_overhead_; + + Stats current_stats_; + // Protects against ProcessPackets reentry from packet sent receipts. + bool processing_packets_ = false; }; #endif \ No newline at end of file