diff --git a/src/media/nvcodec/NvDecoder.cpp b/src/media/nvcodec/NvDecoder.cpp index 42f3e60..dfa10ef 100644 --- a/src/media/nvcodec/NvDecoder.cpp +++ b/src/media/nvcodec/NvDecoder.cpp @@ -901,7 +901,7 @@ uint8_t *NvDecoder::GetFrame(int64_t *pTimestamp) { uint8_t *NvDecoder::GetLockedFrame(int64_t *pTimestamp) { uint8_t *pFrame; - uint64_t timestamp; + uint32_t timestamp; if (m_nDecodedFrame > 0) { std::lock_guard lock(m_mtxVPFrame); m_nDecodedFrame--; @@ -924,7 +924,7 @@ void NvDecoder::UnlockFrame(uint8_t **pFrame) { m_vpFrame.insert(m_vpFrame.end(), &pFrame[0], &pFrame[1]); // add a dummy entry for timestamp - uint64_t timestamp[2] = {0}; + uint32_t timestamp[2] = {0}; m_vTimestamp.insert(m_vTimestamp.end(), ×tamp[0], ×tamp[1]); } #pragma warning(pop) \ No newline at end of file diff --git a/src/qos/nack_requester.cc b/src/qos/nack_requester.cc index a2890c5..7648e77 100644 --- a/src/qos/nack_requester.cc +++ b/src/qos/nack_requester.cc @@ -56,6 +56,15 @@ int NackRequester::OnReceivedPacket(uint16_t seq_num) { return OnReceivedPacket(seq_num, false); } +void NackRequester::ProcessNacks() { + std::vector nack_batch = GetNackBatch(kTimeOnly); + if (!nack_batch.empty()) { + // This batch of NACKs is triggered externally; there is no external + // initiator who can batch them with other feedback messages. + nack_sender_->SendNack(nack_batch, /*buffering_allowed=*/false); + } +} + int NackRequester::OnReceivedPacket(uint16_t seq_num, bool is_recovered) { bool is_retransmitted = true; diff --git a/src/qos/nack_requester.h b/src/qos/nack_requester.h index efb840f..2d0e040 100644 --- a/src/qos/nack_requester.h +++ b/src/qos/nack_requester.h @@ -48,6 +48,8 @@ class NackRequester { int OnReceivedPacket(uint16_t seq_num); int OnReceivedPacket(uint16_t seq_num, bool is_recovered); + void ProcessNacks(); + private: void ClearUpTo(uint16_t seq_num); void UpdateRtt(int64_t rtt_ms); diff --git a/src/rtp/rtp_packet/rtp_codec.cc b/src/rtp/rtp_packet/rtp_codec.cc index e24c922..11bbc33 100644 --- a/src/rtp/rtp_packet/rtp_codec.cc +++ b/src/rtp/rtp_packet/rtp_codec.cc @@ -547,7 +547,7 @@ void RtpCodec::Encode(uint8_t* buffer, uint32_t size, } } else if (rtp::PAYLOAD_TYPE::AV1 == payload_type_) { std::vector obus = ParseObus(buffer, size); - uint64_t timestamp = + uint32_t timestamp = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()) .count(); diff --git a/src/rtp/rtp_packet/rtp_codec.hxx b/src/rtp/rtp_packet/rtp_codec.hxx index 755d94a..78a77ff 100644 --- a/src/rtp/rtp_packet/rtp_codec.hxx +++ b/src/rtp/rtp_packet/rtp_codec.hxx @@ -40,7 +40,7 @@ class RtpCodec { bool marker_ = false; uint32_t payload_type_ = 0; uint16_t sequence_number_ = 0; - uint64_t timestamp_ = 0; + uint32_t timestamp_ = 0; uint32_t ssrc_ = 0; std::vector csrcs_; uint16_t profile_ = 0; diff --git a/src/rtp/rtp_packet/rtp_header.h b/src/rtp/rtp_packet/rtp_header.h index 1837be6..d3a42e1 100644 --- a/src/rtp/rtp_packet/rtp_header.h +++ b/src/rtp/rtp_packet/rtp_header.h @@ -52,7 +52,7 @@ struct RTPHeader { bool marker_ = false; uint8_t payload_type_ = 0; uint16_t sequence_number_ = 1; - uint64_t timestamp_ = 0; + uint32_t timestamp_ = 0; uint32_t ssrc_ = 0; uint32_t csrcs_[kMaxRtpCsrcSize]; size_t padding_len; diff --git a/src/rtp/rtp_packet/rtp_packet.h b/src/rtp/rtp_packet/rtp_packet.h index c3eb1d1..c14ceca 100644 --- a/src/rtp/rtp_packet/rtp_packet.h +++ b/src/rtp/rtp_packet/rtp_packet.h @@ -204,7 +204,7 @@ class RtpPacket { void SetSequenceNumber(uint16_t sequence_number) { sequence_number_ = sequence_number; } - void SetTimestamp(uint64_t timestamp) { timestamp_ = timestamp; } + void SetTimestamp(uint32_t timestamp) { timestamp_ = timestamp; } void SetSsrc(uint32_t ssrc) { ssrc_ = ssrc; } void SetCsrcs(std::vector &csrcs) { csrcs_ = csrcs; } void SetSize(size_t size) { size_ = size; } @@ -297,7 +297,7 @@ class RtpPacket { bool marker_ = false; uint8_t payload_type_ = 0; uint16_t sequence_number_ = 1; - uint64_t timestamp_ = 0; + uint32_t timestamp_ = 0; uint32_t ssrc_ = 0; std::vector csrcs_; diff --git a/src/rtp/rtp_packet/rtp_packet.x b/src/rtp/rtp_packet/rtp_packet.x index 75e08e4..14492ab 100644 --- a/src/rtp/rtp_packet/rtp_packet.x +++ b/src/rtp/rtp_packet/rtp_packet.x @@ -208,7 +208,7 @@ class RtpPacket { void SetSequenceNumber(uint16_t sequence_number) { sequence_number_ = sequence_number; } - void SetTimestamp(uint64_t timestamp) { timestamp_ = timestamp; } + void SetTimestamp(uint32_t timestamp) { timestamp_ = timestamp; } void SetSsrc(uint32_t ssrc) { ssrc_ = ssrc; } void SetCsrcs(std::vector &csrcs) { csrcs_ = csrcs; } @@ -453,7 +453,7 @@ class RtpPacket { bool marker_ = false; uint8_t payload_type_ = 0; uint16_t sequence_number_ = 1; - uint64_t timestamp_ = 0; + uint32_t timestamp_ = 0; uint32_t ssrc_ = 0; std::vector csrcs_; uint16_t profile_ = 0; diff --git a/src/rtp/rtp_packetizer/rtp_packetizer_av1.h b/src/rtp/rtp_packetizer/rtp_packetizer_av1.h index f938edd..5c18db8 100644 --- a/src/rtp/rtp_packetizer/rtp_packetizer_av1.h +++ b/src/rtp/rtp_packetizer/rtp_packetizer_av1.h @@ -33,7 +33,7 @@ class RtpPacketizerAv1 : public RtpPacketizer { bool marker_; uint32_t payload_type_; uint16_t sequence_number_; - uint64_t timestamp_; + uint32_t timestamp_; uint32_t ssrc_; std::vector csrcs_; uint16_t profile_; diff --git a/src/rtp/rtp_packetizer/rtp_packetizer_generic.cpp b/src/rtp/rtp_packetizer/rtp_packetizer_generic.cpp index c8a4668..86573a0 100644 --- a/src/rtp/rtp_packetizer/rtp_packetizer_generic.cpp +++ b/src/rtp/rtp_packetizer/rtp_packetizer_generic.cpp @@ -54,7 +54,7 @@ std::vector> RtpPacketizerGeneric::Build( payload_size / MAX_NALU_LEN + (last_packet_size ? 1 : 0); // TODO: use frame timestamp - uint64_t timestamp = std::chrono::duration_cast( + uint32_t timestamp = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()) .count(); diff --git a/src/rtp/rtp_packetizer/rtp_packetizer_generic.h b/src/rtp/rtp_packetizer/rtp_packetizer_generic.h index 869c380..decb788 100644 --- a/src/rtp/rtp_packetizer/rtp_packetizer_generic.h +++ b/src/rtp/rtp_packetizer/rtp_packetizer_generic.h @@ -36,7 +36,7 @@ class RtpPacketizerGeneric : public RtpPacketizer { bool marker_; uint32_t payload_type_; uint16_t sequence_number_; - uint64_t timestamp_; + uint32_t timestamp_; uint32_t ssrc_; std::vector csrcs_; uint16_t profile_; diff --git a/src/rtp/rtp_packetizer/rtp_packetizer_h264.cpp b/src/rtp/rtp_packetizer/rtp_packetizer_h264.cpp index 622f3a7..993cf32 100644 --- a/src/rtp/rtp_packetizer/rtp_packetizer_h264.cpp +++ b/src/rtp/rtp_packetizer/rtp_packetizer_h264.cpp @@ -82,7 +82,13 @@ std::vector> RtpPacketizerH264::BuildNalu( marker_ = 1; payload_type_ = rtp::PAYLOAD_TYPE(payload_type_); sequence_number_++; - timestamp_ = rtp::kMsToRtpTimestamp * rtp_timestamp; + + // TODO: use frame timestamp + uint32_t timestamp = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + + timestamp_ = timestamp; if (!csrc_count_) { } @@ -149,7 +155,7 @@ std::vector> RtpPacketizerH264::BuildFua( payload_size / MAX_NALU_LEN + (last_packet_size ? 1 : 0); // TODO: use frame timestamp - uint64_t timestamp = std::chrono::duration_cast( + uint32_t timestamp = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()) .count(); @@ -228,6 +234,7 @@ std::vector> RtpPacketizerH264::BuildFua( std::unique_ptr rtp_packet = std::make_unique(); rtp_packet->Build(rtp_packet_frame_.data(), rtp_packet_frame_.size()); + rtp_packet->Build(rtp_packet_frame_.data(), rtp_packet_frame_.size()); rtp_packets.emplace_back(std::move(rtp_packet)); } else { std::unique_ptr rtp_packet = std::make_unique(); diff --git a/src/rtp/rtp_packetizer/rtp_packetizer_h264.h b/src/rtp/rtp_packetizer/rtp_packetizer_h264.h index ad96d17..ebfff6b 100644 --- a/src/rtp/rtp_packetizer/rtp_packetizer_h264.h +++ b/src/rtp/rtp_packetizer/rtp_packetizer_h264.h @@ -45,7 +45,7 @@ class RtpPacketizerH264 : public RtpPacketizer { bool marker_; uint32_t payload_type_; uint16_t sequence_number_; - uint64_t timestamp_; + uint32_t timestamp_; uint32_t ssrc_; std::vector csrcs_; uint16_t profile_; diff --git a/src/transport/channel/rtp_video_receiver.cpp b/src/transport/channel/rtp_video_receiver.cpp index 34bf9b4..3083272 100644 --- a/src/transport/channel/rtp_video_receiver.cpp +++ b/src/transport/channel/rtp_video_receiver.cpp @@ -11,7 +11,8 @@ #define NV12_BUFFER_SIZE (1280 * 720 * 3 / 2) #define RTCP_RR_INTERVAL 1000 -#define MAX_WAIT_TIME_MS 20 // 20ms +#define MAX_WAIT_TIME_MS 20 // 20ms +#define NACK_UPDATE_INTERVAL 20 // 20ms RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr clock) : ssrc_(GenerateUniqueSsrc()), @@ -148,11 +149,6 @@ void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) { file_rtp_recv_); #endif - receive_side_congestion_controller_.OnReceivedPacket(rtp_packet_received, - MediaType::VIDEO); - - nack_->OnReceivedPacket(rtp_packet.SequenceNumber()); - last_recv_bytes_ = (uint32_t)rtp_packet.PayloadSize(); total_rtp_payload_recv_ += (uint32_t)rtp_packet.PayloadSize(); total_rtp_packets_recv_++; @@ -167,6 +163,13 @@ void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) { io_statistics_->UpdateVideoPacketLossCount(rtp_packet.SequenceNumber()); } + uint32_t now_ts = static_cast( + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count()); + + CheckIsTimeUpdateNack(now_ts); + // if (CheckIsTimeSendRR()) { // ReceiverReport rtcp_rr; // RtcpReportBlock report; @@ -208,14 +211,20 @@ void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) { RtpPacketH264 rtp_packet_h264; if (rtp_packet_h264.Build(rtp_packet.Buffer().data(), rtp_packet.Size())) { rtp_packet_h264.GetFrameHeaderInfo(); - ProcessH264RtpPacket(rtp_packet_h264); - } else { - LOG_ERROR("Invalid h264 rtp packet"); + bool is_missing_packet = ProcessH264RtpPacket(rtp_packet_h264); + if (!is_missing_packet) { + receive_side_congestion_controller_.OnReceivedPacket( + rtp_packet_received, MediaType::VIDEO); + nack_->OnReceivedPacket(rtp_packet.SequenceNumber(), true); + } else { + nack_->OnReceivedPacket(rtp_packet.SequenceNumber(), false); + } } } } -void RtpVideoReceiver::ProcessH264RtpPacket(RtpPacketH264& rtp_packet_h264) { +bool RtpVideoReceiver::ProcessH264RtpPacket(RtpPacketH264& rtp_packet_h264) { + bool is_missing_packet = false; if (!fec_enable_) { if (rtp::PAYLOAD_TYPE::H264 == rtp_packet_h264.PayloadType()) { rtp::NAL_UNIT_TYPE nalu_type = rtp_packet_h264.NalUnitType(); @@ -232,21 +241,31 @@ void RtpVideoReceiver::ProcessH264RtpPacket(RtpPacketH264& rtp_packet_h264) { } else if (rtp::NAL_UNIT_TYPE::FU_A == nalu_type) { incomplete_h264_frame_list_[rtp_packet_h264.SequenceNumber()] = rtp_packet_h264; - if (incomplete_h264_frame_list_.find( - rtp_packet_h264.SequenceNumber()) == - incomplete_h264_frame_list_.end()) { - LOG_ERROR("missing seq {}", rtp_packet_h264.SequenceNumber()); - } if (rtp_packet_h264.FuAEnd()) { CheckIsH264FrameCompletedFuaEndReceived(rtp_packet_h264); } else { auto missing_seqs_iter = missing_sequence_numbers_.find(rtp_packet_h264.Timestamp()); + auto missing_seqs_wait_ts_iter = + missing_sequence_numbers_wait_time_.find( + rtp_packet_h264.Timestamp()); if (missing_seqs_iter != missing_sequence_numbers_.end()) { - auto missing_seqs = missing_seqs_iter->second; - if (missing_seqs.find(rtp_packet_h264.SequenceNumber()) != - missing_seqs.end()) { - CheckIsH264FrameCompletedMissSeqReceived(rtp_packet_h264); + if (missing_seqs_wait_ts_iter != + missing_sequence_numbers_wait_time_.end()) { + if (clock_->CurrentTime().ms() - + missing_seqs_wait_ts_iter->second <= + MAX_WAIT_TIME_MS) { + auto missing_seqs = missing_seqs_iter->second; + if (missing_seqs.find(rtp_packet_h264.SequenceNumber()) != + missing_seqs.end()) { + CheckIsH264FrameCompletedMissSeqReceived(rtp_packet_h264); + is_missing_packet = true; + } + } else { + missing_sequence_numbers_wait_time_.erase( + missing_seqs_wait_ts_iter); + missing_sequence_numbers_.erase(missing_seqs_iter); + } } } } @@ -359,6 +378,8 @@ void RtpVideoReceiver::ProcessH264RtpPacket(RtpPacketH264& rtp_packet_h264) { // } // } // } + + return is_missing_packet; } void RtpVideoReceiver::ProcessAv1RtpPacket(RtpPacketAv1& rtp_packet_av1) { @@ -384,7 +405,7 @@ void RtpVideoReceiver::ProcessAv1RtpPacket(RtpPacketAv1& rtp_packet_av1) { bool RtpVideoReceiver::CheckIsH264FrameCompletedFuaEndReceived( RtpPacketH264& rtp_packet_h264) { - uint64_t timestamp = rtp_packet_h264.Timestamp(); + uint32_t timestamp = rtp_packet_h264.Timestamp(); uint16_t end_seq = rtp_packet_h264.SequenceNumber(); fua_end_sequence_numbers_[timestamp] = end_seq; uint16_t start_seq = 0; @@ -427,7 +448,7 @@ bool RtpVideoReceiver::CheckIsH264FrameCompletedMissSeqReceived( return false; } - uint64_t timestamp = rtp_packet_h264.Timestamp(); + uint32_t timestamp = rtp_packet_h264.Timestamp(); uint16_t end_seq = fua_end_sequence_numbers_[timestamp]; uint16_t start_seq = 0; bool has_start = false; @@ -468,7 +489,7 @@ bool RtpVideoReceiver::CheckIsH264FrameCompletedMissSeqReceived( } bool RtpVideoReceiver::PopCompleteFrame(uint16_t start_seq, uint16_t end_seq, - uint64_t timestamp) { + uint32_t timestamp) { size_t complete_frame_size = 0; int frame_fragment_count = 0; @@ -630,20 +651,24 @@ void RtpVideoReceiver::SendRemb(int64_t bitrate_bps, active_remb_module_->SetRemb(bitrate_bps, std::move(ssrcs)); } -bool RtpVideoReceiver::CheckIsTimeSendRR() { - uint32_t now_ts = static_cast( - std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count()); - - if (now_ts - last_send_rtcp_rr_packet_ts_ >= RTCP_RR_INTERVAL) { - last_send_rtcp_rr_packet_ts_ = now_ts; +bool RtpVideoReceiver::CheckIsTimeSendRR(uint32_t now) { + if (now - last_send_rtcp_rr_packet_ts_ >= RTCP_RR_INTERVAL) { + last_send_rtcp_rr_packet_ts_ = now; return true; } else { return false; } } +void RtpVideoReceiver::CheckIsTimeUpdateNack(uint32_t now) { + if (now - last_nack_update_ts_ >= NACK_UPDATE_INTERVAL) { + last_send_rtcp_rr_packet_ts_ = now; + if (nack_) { + nack_->ProcessNacks(); + } + } +} + bool RtpVideoReceiver::Process() { if (!compelete_video_frame_queue_.isEmpty()) { std::optional video_frame = diff --git a/src/transport/channel/rtp_video_receiver.h b/src/transport/channel/rtp_video_receiver.h index c5a710e..47350f9 100644 --- a/src/transport/channel/rtp_video_receiver.h +++ b/src/transport/channel/rtp_video_receiver.h @@ -55,14 +55,15 @@ class RtpVideoReceiver : public ThreadBase, bool CheckIsAv1FrameCompleted(RtpPacketAv1& rtp_packet_av1); private: - void ProcessH264RtpPacket(RtpPacketH264& rtp_packet_h264); + bool ProcessH264RtpPacket(RtpPacketH264& rtp_packet_h264); bool CheckIsH264FrameCompletedFuaEndReceived(RtpPacketH264& rtp_packet_h264); bool CheckIsH264FrameCompletedMissSeqReceived(RtpPacketH264& rtp_packet_h264); bool PopCompleteFrame(uint16_t start_seq, uint16_t end_seq, - uint64_t timestamp); + uint32_t timestamp); private: - bool CheckIsTimeSendRR(); + bool CheckIsTimeSendRR(uint32_t now); + void CheckIsTimeUpdateNack(uint32_t now); int SendRtcpRR(ReceiverReport& rtcp_rr); void SendCombinedRtcpPacket( @@ -106,6 +107,7 @@ class RtpVideoReceiver : public ThreadBase, uint32_t total_rtp_payload_recv_ = 0; uint32_t last_send_rtcp_rr_packet_ts_ = 0; + uint32_t last_nack_update_ts_ = 0; std::function data_send_func_ = nullptr; private: diff --git a/src/transport/channel/video_channel_send.cpp b/src/transport/channel/video_channel_send.cpp index ef9cd5e..f225fc3 100644 --- a/src/transport/channel/video_channel_send.cpp +++ b/src/transport/channel/video_channel_send.cpp @@ -107,7 +107,7 @@ void VideoChannelSend::Destroy() { int VideoChannelSend::SendVideo(std::shared_ptr encoded_frame) { if (rtp_video_sender_ && rtp_packetizer_) { - int64_t rtp_timestamp = + int32_t rtp_timestamp = delta_ntp_internal_ms_ + static_cast(encoded_frame->CapturedTimestamp() / 1000); std::vector> rtp_packets = diff --git a/src/transport/ice_transport_controller.cpp b/src/transport/ice_transport_controller.cpp index c2cc7dc..6ef4134 100644 --- a/src/transport/ice_transport_controller.cpp +++ b/src/transport/ice_transport_controller.cpp @@ -53,8 +53,10 @@ void IceTransportController::Create( CreateVideoCodec(clock_, video_codec_payload_type, hardware_acceleration); CreateAudioCodec(); + task_queue_ = std::make_shared(); controller_ = std::make_unique(); - packet_sender_ = std::make_shared(ice_agent, webrtc_clock_); + packet_sender_ = + std::make_shared(ice_agent, webrtc_clock_, task_queue_); packet_sender_->SetPacingRates(DataRate::BitsPerSec(300000), DataRate::Zero()); packet_sender_->SetSendBurstInterval(TimeDelta::Millis(40)); @@ -481,9 +483,13 @@ void IceTransportController::OnReceiverReport( msg.receive_time = now; msg.start_time = last_report_block_time_; msg.end_time = now; - if (controller_) { - PostUpdates(controller_->OnTransportLossReport(msg)); - } + + task_queue_->PostTask([this, msg]() mutable { + if (controller_) { + PostUpdates(controller_->OnTransportLossReport(msg)); + } + }); + last_report_block_time_ = now; } @@ -492,19 +498,18 @@ void IceTransportController::OnCongestionControlFeedback( std::optional feedback_msg = transport_feedback_adapter_.ProcessCongestionControlFeedback( feedback, Timestamp::Micros(clock_->CurrentTimeUs())); - if (feedback_msg) { - HandleTransportPacketsFeedback(*feedback_msg); + if (feedback_msg.has_value()) { + task_queue_->PostTask([this, feedback_msg]() mutable { + if (controller_) { + PostUpdates( + controller_->OnTransportPacketsFeedback(feedback_msg.value())); + } + }); + + UpdateCongestedState(); } } -void IceTransportController::HandleTransportPacketsFeedback( - const webrtc::TransportPacketsFeedback& feedback) { - if (controller_) - PostUpdates(controller_->OnTransportPacketsFeedback(feedback)); - - UpdateCongestedState(); -} - void IceTransportController::OnReceiveNack( const std::vector& nack_sequence_numbers) { if (video_channel_send_) { @@ -512,12 +517,6 @@ void IceTransportController::OnReceiveNack( } } -void IceTransportController::UpdateControllerWithTimeInterval() { - ProcessInterval msg; - msg.at_time = Timestamp::Millis(webrtc_clock_->TimeInMilliseconds()); - PostUpdates(controller_->OnProcessInterval(msg)); -} - void IceTransportController::OnSentRtpPacket( const webrtc::RtpPacketToSend& packet) { webrtc::PacedPacketInfo pacing_info; @@ -604,8 +603,11 @@ std::optional IceTransportController::GetCongestedStateUpdate() const { } bool IceTransportController::Process() { - webrtc::ProcessInterval msg; - msg.at_time = Timestamp::Millis(webrtc_clock_->TimeInMilliseconds()); - PostUpdates(controller_->OnProcessInterval(msg)); + task_queue_->PostTask([this]() mutable { + webrtc::ProcessInterval msg; + msg.at_time = Timestamp::Millis(webrtc_clock_->TimeInMilliseconds()); + PostUpdates(controller_->OnProcessInterval(msg)); + }); + return true; } \ No newline at end of file diff --git a/src/transport/ice_transport_controller.h b/src/transport/ice_transport_controller.h index a482f22..3965f17 100644 --- a/src/transport/ice_transport_controller.h +++ b/src/transport/ice_transport_controller.h @@ -24,6 +24,7 @@ #include "packet_sender.h" #include "packet_sender_imp.h" #include "resolution_adapter.h" +#include "task_queue.h" #include "transport_feedback_adapter.h" #include "video_channel_receive.h" #include "video_channel_send.h" @@ -82,10 +83,7 @@ class IceTransportController int CreateAudioCodec(); private: - void UpdateControllerWithTimeInterval(); void OnSentRtpPacket(const webrtc::RtpPacketToSend &packet); - void HandleTransportPacketsFeedback( - const webrtc::TransportPacketsFeedback &feedback); void PostUpdates(webrtc::NetworkControlUpdate update); void UpdateControlState(); void UpdateCongestedState(); @@ -121,6 +119,7 @@ class IceTransportController webrtc::TransportFeedbackAdapter transport_feedback_adapter_; std::unique_ptr controller_; BitrateProber prober_; + std::shared_ptr task_queue_; webrtc::DataSize congestion_window_size_; bool is_congested_ = false; diff --git a/src/transport/packet_sender/packet_sender_imp.cpp b/src/transport/packet_sender/packet_sender_imp.cpp index e376b9f..72045fe 100644 --- a/src/transport/packet_sender/packet_sender_imp.cpp +++ b/src/transport/packet_sender/packet_sender_imp.cpp @@ -6,7 +6,8 @@ const int PacketSenderImp::kNoPacketHoldback = -1; PacketSenderImp::PacketSenderImp(std::shared_ptr ice_agent, - std::shared_ptr clock) + std::shared_ptr clock, + std::shared_ptr task_queue) : ice_agent_(ice_agent), clock_(clock), pacing_controller_(clock.get(), this), @@ -18,7 +19,8 @@ PacketSenderImp::PacketSenderImp(std::shared_ptr ice_agent, packet_size_(/*alpha=*/0.95), include_overhead_(false), last_send_time_(webrtc::Timestamp::Millis(0)), - last_call_time_(webrtc::Timestamp::Millis(0)) {} + last_call_time_(webrtc::Timestamp::Millis(0)), + task_queue_(task_queue) {} PacketSenderImp::~PacketSenderImp() {} @@ -80,7 +82,7 @@ void PacketSenderImp::SetPacingRates(webrtc::DataRate pacing_rate, void PacketSenderImp::EnqueuePackets( std::vector> packets) { - task_queue_.PostTask([this, packets = std::move(packets)]() mutable { + task_queue_->PostTask([this, packets = std::move(packets)]() mutable { for (auto &packet : packets) { size_t packet_size = packet->payload_size() + packet->padding_size(); if (include_overhead_) { @@ -94,7 +96,7 @@ void PacketSenderImp::EnqueuePackets( } void PacketSenderImp::RemovePacketsForSsrc(uint32_t ssrc) { - task_queue_.PostTask([this, ssrc] { + task_queue_->PostTask([this, ssrc] { pacing_controller_.RemovePacketsForSsrc(ssrc); MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); }); @@ -226,7 +228,7 @@ void PacketSenderImp::MaybeProcessPackets( if (next_process_time_.IsMinusInfinity() || next_process_time_ > next_send_time) { // Prefer low precision if allowed and not probing. - task_queue_.PostDelayedTask( + task_queue_->PostDelayedTask( [this, next_send_time]() { MaybeProcessPackets(next_send_time); }, time_to_next_process.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms()); next_process_time_ = next_send_time; @@ -281,7 +283,6 @@ int PacketSenderImp::EnqueueRtpPacket( rtp_packet_to_send->set_packet_type(webrtc::RtpPacketMediaType::kVideo); break; } - // webrtc::PacedPacketInfo cluster_info; // SendPacket(std::move(rtp_packet_to_send), cluster_info); diff --git a/src/transport/packet_sender/packet_sender_imp.h b/src/transport/packet_sender/packet_sender_imp.h index 4c45592..3ef57e0 100644 --- a/src/transport/packet_sender/packet_sender_imp.h +++ b/src/transport/packet_sender/packet_sender_imp.h @@ -31,7 +31,8 @@ class PacketSenderImp : public PacketSender, static const int kNoPacketHoldback; PacketSenderImp(std::shared_ptr ice_agent, - std::shared_ptr clock); + std::shared_ptr clock, + std::shared_ptr task_queue); ~PacketSenderImp(); public: @@ -221,7 +222,7 @@ class PacketSenderImp : public PacketSender, // Protects against ProcessPackets reentry from packet sent receipts. bool processing_packets_ = false; - TaskQueue task_queue_; + std::shared_ptr task_queue_; int64_t transport_seq_ = 0; std::map ssrc_seq_;