diff --git a/src/media/video/decode/nvcodec/nvidia_video_decoder.cpp b/src/media/video/decode/nvcodec/nvidia_video_decoder.cpp index 144836e..3dfd3b0 100644 --- a/src/media/video/decode/nvcodec/nvidia_video_decoder.cpp +++ b/src/media/video/decode/nvcodec/nvidia_video_decoder.cpp @@ -80,7 +80,7 @@ int NvidiaVideoDecoder::Decode( #endif if ((*(data + 4) & 0x1f) == 0x07) { - LOG_WARN("Receive key frame"); + LOG_INFO("Receive key frame"); } int num_frame_returned = decoder->Decode(data, (int)size); diff --git a/src/transport/channel/audio_channel_send.cpp b/src/transport/channel/audio_channel_send.cpp index 542c46e..a76aaeb 100644 --- a/src/transport/channel/audio_channel_send.cpp +++ b/src/transport/channel/audio_channel_send.cpp @@ -8,9 +8,9 @@ AudioChannelSend::~AudioChannelSend() {} AudioChannelSend::AudioChannelSend( std::shared_ptr ice_agent, - std::shared_ptr packet_sender, + std::shared_ptr packet_sender, std::shared_ptr ice_io_statistics) - : packet_sender_(packet_sender), + : paced_sender_(packet_sender), ice_agent_(ice_agent), ice_io_statistics_(ice_io_statistics) {} @@ -49,7 +49,7 @@ int AudioChannelSend::SendAudio(char *data, size_t size) { if (rtp_audio_sender_ && rtp_packetizer_) { std::vector> rtp_packets = rtp_packetizer_->Build((uint8_t *)data, (uint32_t)size, 0, true); - // packet_sender_->EnqueueRtpPackets(rtp_packets, 0); + // paced_sender_->EnqueueRtpPackets(rtp_packets, 0); rtp_audio_sender_->Enqueue(rtp_packets); } diff --git a/src/transport/channel/audio_channel_send.h b/src/transport/channel/audio_channel_send.h index 9566435..4efef80 100644 --- a/src/transport/channel/audio_channel_send.h +++ b/src/transport/channel/audio_channel_send.h @@ -8,7 +8,7 @@ #define _AUDIO_CHANNEL_SEND_H_ #include "ice_agent.h" -#include "packet_sender.h" +#include "paced_sender.h" #include "rtp_audio_sender.h" #include "rtp_packetizer.h" @@ -16,7 +16,7 @@ class AudioChannelSend { public: AudioChannelSend(); AudioChannelSend(std::shared_ptr ice_agent, - std::shared_ptr packet_sender, + std::shared_ptr packet_sender, std::shared_ptr ice_io_statistics); ~AudioChannelSend(); @@ -36,7 +36,7 @@ class AudioChannelSend { void OnReceiverReport(const ReceiverReport& receiver_report) {} private: - std::shared_ptr packet_sender_ = nullptr; + std::shared_ptr paced_sender_ = nullptr; std::shared_ptr ice_agent_ = nullptr; std::shared_ptr ice_io_statistics_ = nullptr; std::unique_ptr rtp_packetizer_ = nullptr; diff --git a/src/transport/channel/data_channel_send.cpp b/src/transport/channel/data_channel_send.cpp index 1211607..9addd9b 100644 --- a/src/transport/channel/data_channel_send.cpp +++ b/src/transport/channel/data_channel_send.cpp @@ -8,9 +8,9 @@ DataChannelSend::~DataChannelSend() {} DataChannelSend::DataChannelSend( std::shared_ptr ice_agent, - std::shared_ptr packet_sender, + std::shared_ptr packet_sender, std::shared_ptr ice_io_statistics) - : packet_sender_(packet_sender), + : paced_sender_(packet_sender), ice_agent_(ice_agent), ice_io_statistics_(ice_io_statistics) {} @@ -49,7 +49,7 @@ int DataChannelSend::SendData(const char *data, size_t size) { if (rtp_data_sender_ && rtp_packetizer_) { std::vector> rtp_packets = rtp_packetizer_->Build((uint8_t *)data, (uint32_t)size, 0, true); - // packet_sender_->EnqueueRtpPackets(rtp_packets, 0); + // paced_sender_->EnqueueRtpPackets(rtp_packets, 0); rtp_data_sender_->Enqueue(rtp_packets); } diff --git a/src/transport/channel/data_channel_send.h b/src/transport/channel/data_channel_send.h index d8cd00e..3cd7dcc 100644 --- a/src/transport/channel/data_channel_send.h +++ b/src/transport/channel/data_channel_send.h @@ -8,7 +8,7 @@ #define _DATA_CHANNEL_SEND_H_ #include "ice_agent.h" -#include "packet_sender.h" +#include "paced_sender.h" #include "rtp_data_sender.h" #include "rtp_packetizer.h" @@ -16,7 +16,7 @@ class DataChannelSend { public: DataChannelSend(); DataChannelSend(std::shared_ptr ice_agent, - std::shared_ptr packet_sender, + std::shared_ptr packet_sender, std::shared_ptr ice_io_statistics); ~DataChannelSend(); @@ -36,7 +36,7 @@ class DataChannelSend { void OnReceiverReport(const ReceiverReport& receiver_report) {} private: - std::shared_ptr packet_sender_ = nullptr; + std::shared_ptr paced_sender_ = nullptr; std::shared_ptr ice_agent_ = nullptr; std::shared_ptr ice_io_statistics_ = nullptr; std::unique_ptr rtp_packetizer_ = nullptr; diff --git a/src/transport/channel/video_channel_send.cpp b/src/transport/channel/video_channel_send.cpp index f6cf4c8..f8e6088 100644 --- a/src/transport/channel/video_channel_send.cpp +++ b/src/transport/channel/video_channel_send.cpp @@ -8,16 +8,13 @@ VideoChannelSend::VideoChannelSend( std::shared_ptr clock, std::shared_ptr ice_agent, - std::shared_ptr packet_sender, - std::shared_ptr ice_io_statistics, - std::function - on_sent_packet_func) + std::shared_ptr packet_sender, + std::shared_ptr ice_io_statistics) : ice_agent_(ice_agent), - packet_sender_(packet_sender), + paced_sender_(packet_sender), ssrc_(GenerateUniqueSsrc()), rtx_ssrc_(GenerateUniqueSsrc()), ice_io_statistics_(ice_io_statistics), - on_sent_packet_func_(on_sent_packet_func), delta_ntp_internal_ms_(clock->CurrentNtpInMilliseconds() - clock->CurrentTimeMs()), rtp_packet_history_(clock), @@ -42,16 +39,20 @@ VideoChannelSend::~VideoChannelSend() { void VideoChannelSend::Initialize(rtp::PAYLOAD_TYPE payload_type) { rtp_packetizer_ = RtpPacketizer::Create(payload_type, ssrc_); + task_queue_history_ = std::make_shared("rtp pakcet history"); } void VideoChannelSend::OnSentRtpPacket( std::unique_ptr packet) { - if (packet->retransmitted_sequence_number()) { - rtp_packet_history_.MarkPacketAsSent( - *packet->retransmitted_sequence_number()); - } else if (packet->PayloadType() != rtp::PAYLOAD_TYPE::H264 - 1) { - rtp_packet_history_.PutRtpPacket(std::move(packet), clock_->CurrentTime()); - } + task_queue_history_->PostTask([this, packet = std::move(packet)]() mutable { + if (packet->retransmitted_sequence_number()) { + rtp_packet_history_.MarkPacketAsSent( + packet->retransmitted_sequence_number().value()); + } else if (packet->PayloadType() != rtp::PAYLOAD_TYPE::H264 - 1) { + rtp_packet_history_.PutRtpPacket(std::move(packet), + clock_->CurrentTime()); + } + }); } void VideoChannelSend::OnReceiveNack( @@ -65,7 +66,11 @@ void VideoChannelSend::OnReceiveNack( // } int64_t avg_rtt = 10; - rtp_packet_history_.SetRtt(TimeDelta::Millis(5 + avg_rtt)); + + task_queue_history_->PostTask([this, avg_rtt]() { + rtp_packet_history_.SetRtt(TimeDelta::Millis(5 + avg_rtt)); + }); + for (uint16_t seq_no : nack_sequence_numbers) { const int32_t bytes_sent = ReSendPacket(seq_no); if (bytes_sent < 0) { @@ -89,7 +94,7 @@ std::vector> VideoChannelSend::GeneratePadding( void VideoChannelSend::Destroy() {} int VideoChannelSend::SendVideo(const EncodedFrame& encoded_frame) { - if (rtp_packetizer_ && packet_sender_) { + if (rtp_packetizer_ && paced_sender_) { uint32_t rtp_timestamp = delta_ntp_internal_ms_ + static_cast(encoded_frame.CapturedTimestamp() / 1000); @@ -102,54 +107,55 @@ int VideoChannelSend::SendVideo(const EncodedFrame& encoded_frame) { fwrite((unsigned char*)encoded_frame.Buffer(), 1, encoded_frame.Size(), file_rtp_sent_); #endif - packet_sender_->EnqueueRtpPackets(std::move(rtp_packets), rtp_timestamp); + paced_sender_->EnqueueRtpPackets(std::move(rtp_packets), rtp_timestamp); } return 0; } int32_t VideoChannelSend::ReSendPacket(uint16_t packet_id) { - int32_t packet_size = 0; + task_queue_history_->PostTask([this, packet_id]() { + int32_t packet_size = 0; + std::unique_ptr packet = + rtp_packet_history_.GetPacketAndMarkAsPending( + packet_id, [&](const webrtc::RtpPacketToSend& stored_packet) { + // Check if we're overusing retransmission bitrate. + // TODO(sprang): Add histograms for nack success or failure + // reasons. + packet_size = stored_packet.size(); + std::unique_ptr retransmit_packet; - std::unique_ptr packet = - rtp_packet_history_.GetPacketAndMarkAsPending( - packet_id, [&](const webrtc::RtpPacketToSend& stored_packet) { - // Check if we're overusing retransmission bitrate. - // TODO(sprang): Add histograms for nack success or failure - // reasons. - packet_size = stored_packet.size(); - std::unique_ptr retransmit_packet; + retransmit_packet = + std::make_unique(stored_packet); - retransmit_packet = - std::make_unique(stored_packet); + retransmit_packet->SetSsrc(rtx_ssrc_); + retransmit_packet->SetPayloadType(rtp::PAYLOAD_TYPE::RTX); - retransmit_packet->SetSsrc(rtx_ssrc_); - retransmit_packet->SetPayloadType(rtp::PAYLOAD_TYPE::RTX); + retransmit_packet->set_retransmitted_sequence_number( + stored_packet.SequenceNumber()); + retransmit_packet->set_original_ssrc(stored_packet.Ssrc()); + retransmit_packet->BuildRtxPacket(); - retransmit_packet->set_retransmitted_sequence_number( - stored_packet.SequenceNumber()); - retransmit_packet->set_original_ssrc(stored_packet.Ssrc()); - retransmit_packet->BuildRtxPacket(); + return retransmit_packet; + }); + if (packet_size == 0) { + // Packet not found or already queued for retransmission, ignore. + return; + } + if (!packet) { + // Packet was found, but lambda helper above chose not to create + // `retransmit_packet` out of it. + LOG_WARN("packet not found"); + return; + } - return retransmit_packet; - }); - if (packet_size == 0) { - // Packet not found or already queued for retransmission, ignore. - return 0; - } - if (!packet) { - // Packet was found, but lambda helper above chose not to create - // `retransmit_packet` out of it. - LOG_WARN("packet not found"); - return -1; - } + packet->set_packet_type(webrtc::RtpPacketMediaType::kRetransmission); + packet->set_fec_protect_packet(false); - packet->set_packet_type(webrtc::RtpPacketMediaType::kRetransmission); - packet->set_fec_protect_packet(false); + if (paced_sender_) { + paced_sender_->EnqueueRtpPacket(std::move(packet)); + } + }); - if (packet_sender_) { - packet_sender_->EnqueueRtpPacket(std::move(packet)); - } - - return packet_size; + return 0; } \ No newline at end of file diff --git a/src/transport/channel/video_channel_send.h b/src/transport/channel/video_channel_send.h index a6b8154..3a4e411 100644 --- a/src/transport/channel/video_channel_send.h +++ b/src/transport/channel/video_channel_send.h @@ -14,20 +14,19 @@ #include "congestion_control_feedback.h" #include "encoded_frame.h" #include "ice_agent.h" -#include "packet_sender.h" +#include "paced_sender.h" #include "rtp_packet_history.h" #include "rtp_packetizer.h" #include "rtp_video_sender.h" +#include "task_queue.h" #include "transport_feedback_adapter.h" class VideoChannelSend { public: VideoChannelSend(std::shared_ptr clock, std::shared_ptr ice_agent, - std::shared_ptr packet_sender, - std::shared_ptr ice_io_statistics, - std::function - on_sent_packet_func_); + std::shared_ptr packet_sender, + std::shared_ptr ice_io_statistics); ~VideoChannelSend(); void OnSentRtpPacket(std::unique_ptr packet); @@ -62,18 +61,16 @@ class VideoChannelSend { int32_t ReSendPacket(uint16_t packet_id); private: - std::shared_ptr packet_sender_ = nullptr; + std::shared_ptr paced_sender_ = nullptr; std::shared_ptr ice_agent_ = nullptr; std::shared_ptr ice_io_statistics_ = nullptr; std::unique_ptr rtp_packetizer_ = nullptr; - std::function - on_sent_packet_func_ = nullptr; - private: uint32_t ssrc_ = 0; uint32_t rtx_ssrc_ = 0; std::shared_ptr clock_; + std::shared_ptr task_queue_history_; RtpPacketHistory rtp_packet_history_; int64_t delta_ntp_internal_ms_; diff --git a/src/transport/ice_transport_controller.cpp b/src/transport/ice_transport_controller.cpp index 7add7b2..eb70cf5 100644 --- a/src/transport/ice_transport_controller.cpp +++ b/src/transport/ice_transport_controller.cpp @@ -70,19 +70,18 @@ void IceTransportController::Create( task_queue_decode_ = std::make_shared("decode"); controller_ = std::make_unique(); - packet_sender_ = std::make_shared(ice_agent, webrtc_clock_, - task_queue_cc_); - packet_sender_->SetPacingRates(DataRate::BitsPerSec(300000), - DataRate::Zero()); - packet_sender_->SetSendBurstInterval(TimeDelta::Millis(40)); - packet_sender_->SetQueueTimeLimit(TimeDelta::Millis(2000)); - packet_sender_->SetOnSentPacketFunc( + paced_sender_ = + std::make_shared(ice_agent, webrtc_clock_, task_queue_cc_); + paced_sender_->SetPacingRates(DataRate::BitsPerSec(300000), DataRate::Zero()); + paced_sender_->SetSendBurstInterval(TimeDelta::Millis(40)); + paced_sender_->SetQueueTimeLimit(TimeDelta::Millis(2000)); + paced_sender_->SetOnSentPacketFunc( [this](std::unique_ptr packet) { if (ice_agent_) { webrtc::Timestamp now = webrtc_clock_->CurrentTime(); ice_agent_->Send((const char*)packet->Buffer().data(), packet->Size()); - OnSentRtpPacket(*packet); + OnSentPacket(*packet); if (packet->packet_type().has_value()) { switch (packet->packet_type().value()) { @@ -102,13 +101,10 @@ void IceTransportController::Create( resolution_adapter_ = std::make_unique(); video_channel_send_ = std::make_unique( - clock_, ice_agent, packet_sender_, ice_io_statistics, - [this](const webrtc::RtpPacketToSend& packet) { - OnSentRtpPacket(packet); - }); + clock_, ice_agent, paced_sender_, ice_io_statistics); - if (packet_sender_) { - packet_sender_->SetGeneratePaddingFunc( + if (paced_sender_) { + paced_sender_->SetGeneratePaddingFunc( [this](uint32_t size, int64_t captured_timestamp_us) -> std::vector> { return video_channel_send_->GeneratePadding(size, @@ -117,9 +113,9 @@ void IceTransportController::Create( } audio_channel_send_ = std::make_unique( - ice_agent, packet_sender_, ice_io_statistics); + ice_agent, paced_sender_, ice_io_statistics); data_channel_send_ = std::make_unique( - ice_agent, packet_sender_, ice_io_statistics); + ice_agent, paced_sender_, ice_io_statistics); if (video_channel_send_) { video_channel_send_->Initialize(video_codec_payload_type); @@ -285,8 +281,8 @@ void IceTransportController::UpdateNetworkAvaliablity(bool network_available) { controller_->OnNetworkAvailability(msg); } - if (packet_sender_) { - packet_sender_->EnsureStarted(); + if (paced_sender_) { + paced_sender_->EnsureStarted(); } } @@ -556,7 +552,7 @@ void IceTransportController::OnReceiveNack( } } -void IceTransportController::OnSentRtpPacket( +void IceTransportController::OnSentPacket( const webrtc::RtpPacketToSend& packet) { webrtc::PacedPacketInfo pacing_info; size_t transport_overhead_bytes_per_packet_ = 0; @@ -582,14 +578,13 @@ void IceTransportController::PostUpdates(webrtc::NetworkControlUpdate update) { UpdateCongestedState(); } - if (update.pacer_config && packet_sender_) { - packet_sender_->SetPacingRates(update.pacer_config->data_rate(), - update.pacer_config->pad_rate()); + if (update.pacer_config && paced_sender_) { + paced_sender_->SetPacingRates(update.pacer_config->data_rate(), + update.pacer_config->pad_rate()); } - if (!update.probe_cluster_configs.empty() && packet_sender_) { - packet_sender_->CreateProbeClusters( - std::move(update.probe_cluster_configs)); + if (!update.probe_cluster_configs.empty() && paced_sender_) { + paced_sender_->CreateProbeClusters(std::move(update.probe_cluster_configs)); } if (update.target_rate) { @@ -630,8 +625,8 @@ void IceTransportController::UpdateControlState() { void IceTransportController::UpdateCongestedState() { if (auto update = GetCongestedStateUpdate()) { is_congested_ = update.value(); - if (packet_sender_) { - packet_sender_->SetCongested(update.value()); + if (paced_sender_) { + paced_sender_->SetCongested(update.value()); } } } diff --git a/src/transport/ice_transport_controller.h b/src/transport/ice_transport_controller.h index dbdf1ce..ca83322 100644 --- a/src/transport/ice_transport_controller.h +++ b/src/transport/ice_transport_controller.h @@ -21,8 +21,7 @@ #include "data_channel_receive.h" #include "data_channel_send.h" #include "ice_agent.h" -#include "packet_sender.h" -#include "packet_sender_imp.h" +#include "paced_sender.h" #include "resolution_adapter.h" #include "task_queue.h" #include "transport_feedback_adapter.h" @@ -83,7 +82,7 @@ class IceTransportController int CreateAudioCodec(); private: - void OnSentRtpPacket(const webrtc::RtpPacketToSend &packet); + void OnSentPacket(const webrtc::RtpPacketToSend &packet); void PostUpdates(webrtc::NetworkControlUpdate update); void UpdateControlState(); void UpdateCongestedState(); @@ -109,7 +108,7 @@ class IceTransportController std::shared_ptr ice_agent_ = nullptr; std::shared_ptr ice_io_statistics_ = nullptr; std::unique_ptr rtp_packetizer_ = nullptr; - std::shared_ptr packet_sender_ = nullptr; + std::shared_ptr paced_sender_ = nullptr; std::string remote_user_id_; void *user_data_ = nullptr; std::atomic is_running_; diff --git a/src/transport/packet_sender/packet_sender_imp.cpp b/src/transport/paced_sender/paced_sender.cpp similarity index 82% rename from src/transport/packet_sender/packet_sender_imp.cpp rename to src/transport/paced_sender/paced_sender.cpp index 97c11b5..50dc98f 100644 --- a/src/transport/packet_sender/packet_sender_imp.cpp +++ b/src/transport/paced_sender/paced_sender.cpp @@ -1,13 +1,13 @@ -#include "packet_sender_imp.h" +#include "paced_sender.h" #include "log.h" -const int PacketSenderImp::kNoPacketHoldback = -1; +const int PacedSender::kNoPacketHoldback = -1; -PacketSenderImp::PacketSenderImp(std::shared_ptr ice_agent, - std::shared_ptr clock, - std::shared_ptr task_queue) +PacedSender::PacedSender(std::shared_ptr ice_agent, + std::shared_ptr clock, + std::shared_ptr task_queue) : ice_agent_(ice_agent), clock_(clock), pacing_controller_(clock.get(), this), @@ -22,10 +22,10 @@ PacketSenderImp::PacketSenderImp(std::shared_ptr ice_agent, last_call_time_(webrtc::Timestamp::Millis(0)), task_queue_(task_queue) {} -PacketSenderImp::~PacketSenderImp() { is_shutdown_ = true; } +PacedSender::~PacedSender() { is_shutdown_ = true; } std::vector> -PacketSenderImp::GeneratePadding(webrtc::DataSize size) { +PacedSender::GeneratePadding(webrtc::DataSize size) { std::vector> to_send_rtp_packets; std::vector> rtp_packets = generat_padding_func_(size.bytes(), clock_->CurrentTime().ms()); @@ -43,44 +43,44 @@ PacketSenderImp::GeneratePadding(webrtc::DataSize size) { return to_send_rtp_packets; } -void PacketSenderImp::SetSendBurstInterval(webrtc::TimeDelta burst_interval) { +void PacedSender::SetSendBurstInterval(webrtc::TimeDelta burst_interval) { pacing_controller_.SetSendBurstInterval(burst_interval); } -void PacketSenderImp::SetAllowProbeWithoutMediaPacket(bool allow) { +void PacedSender::SetAllowProbeWithoutMediaPacket(bool allow) { pacing_controller_.SetAllowProbeWithoutMediaPacket(allow); } -void PacketSenderImp::EnsureStarted() { +void PacedSender::EnsureStarted() { is_started_ = true; MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); } -void PacketSenderImp::CreateProbeClusters( +void PacedSender::CreateProbeClusters( std::vector probe_cluster_configs) { pacing_controller_.CreateProbeClusters(probe_cluster_configs); MaybeScheduleProcessPackets(); } -void PacketSenderImp::Pause() { pacing_controller_.Pause(); } +void PacedSender::Pause() { pacing_controller_.Pause(); } -void PacketSenderImp::Resume() { +void PacedSender::Resume() { pacing_controller_.Resume(); MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); } -void PacketSenderImp::SetCongested(bool congested) { +void PacedSender::SetCongested(bool congested) { pacing_controller_.SetCongested(congested); MaybeScheduleProcessPackets(); } -void PacketSenderImp::SetPacingRates(webrtc::DataRate pacing_rate, - webrtc::DataRate padding_rate) { +void PacedSender::SetPacingRates(webrtc::DataRate pacing_rate, + webrtc::DataRate padding_rate) { pacing_controller_.SetPacingRates(pacing_rate, padding_rate); MaybeScheduleProcessPackets(); } -void PacketSenderImp::EnqueuePackets( +void PacedSender::EnqueuePackets( std::vector> packets) { task_queue_->PostTask([this, packets = std::move(packets)]() mutable { for (auto &packet : packets) { @@ -95,7 +95,7 @@ void PacketSenderImp::EnqueuePackets( }); } -void PacketSenderImp::EnqueuePacket( +void PacedSender::EnqueuePacket( std::unique_ptr packet) { task_queue_->PostTask([this, packet = std::move(packet)]() mutable { size_t packet_size = packet->payload_size() + packet->padding_size(); @@ -109,48 +109,47 @@ void PacketSenderImp::EnqueuePacket( }); } -void PacketSenderImp::RemovePacketsForSsrc(uint32_t ssrc) { +void PacedSender::RemovePacketsForSsrc(uint32_t ssrc) { task_queue_->PostTask([this, ssrc] { pacing_controller_.RemovePacketsForSsrc(ssrc); MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); }); } -void PacketSenderImp::SetAccountForAudioPackets(bool account_for_audio) { +void PacedSender::SetAccountForAudioPackets(bool account_for_audio) { pacing_controller_.SetAccountForAudioPackets(account_for_audio); MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); } -void PacketSenderImp::SetIncludeOverhead() { +void PacedSender::SetIncludeOverhead() { include_overhead_ = true; pacing_controller_.SetIncludeOverhead(); MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); } -void PacketSenderImp::SetTransportOverhead( - webrtc::DataSize overhead_per_packet) { +void PacedSender::SetTransportOverhead(webrtc::DataSize overhead_per_packet) { pacing_controller_.SetTransportOverhead(overhead_per_packet); MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); } -void PacketSenderImp::SetQueueTimeLimit(webrtc::TimeDelta limit) { +void PacedSender::SetQueueTimeLimit(webrtc::TimeDelta limit) { pacing_controller_.SetQueueTimeLimit(limit); MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); } -webrtc::TimeDelta PacketSenderImp::ExpectedQueueTime() const { +webrtc::TimeDelta PacedSender::ExpectedQueueTime() const { return GetStats().expected_queue_time; } -webrtc::DataSize PacketSenderImp::QueueSizeData() const { +webrtc::DataSize PacedSender::QueueSizeData() const { return GetStats().queue_size; } -std::optional PacketSenderImp::FirstSentPacketTime() const { +std::optional PacedSender::FirstSentPacketTime() const { return GetStats().first_sent_packet_time; } -webrtc::TimeDelta PacketSenderImp::OldestPacketWaitTime() const { +webrtc::TimeDelta PacedSender::OldestPacketWaitTime() const { webrtc::Timestamp oldest_packet = GetStats().oldest_packet_enqueue_time; if (oldest_packet.IsInfinite()) { return webrtc::TimeDelta::Zero(); @@ -165,17 +164,15 @@ webrtc::TimeDelta PacketSenderImp::OldestPacketWaitTime() const { return current - oldest_packet; } -void PacketSenderImp::OnStatsUpdated(const Stats &stats) { - current_stats_ = stats; -} +void PacedSender::OnStatsUpdated(const Stats &stats) { current_stats_ = stats; } -void PacketSenderImp::MaybeScheduleProcessPackets() { +void PacedSender::MaybeScheduleProcessPackets() { if (!processing_packets_) { MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); } } -void PacketSenderImp::MaybeProcessPackets( +void PacedSender::MaybeProcessPackets( webrtc::Timestamp scheduled_process_time) { if (is_shutdown_ || !is_started_) { return; @@ -248,7 +245,7 @@ void PacketSenderImp::MaybeProcessPackets( } } -void PacketSenderImp::UpdateStats() { +void PacedSender::UpdateStats() { Stats new_stats; new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime(); new_stats.first_sent_packet_time = pacing_controller_.FirstSentPacketTime(); @@ -258,13 +255,11 @@ void PacketSenderImp::UpdateStats() { OnStatsUpdated(new_stats); } -PacketSenderImp::Stats PacketSenderImp::GetStats() const { - return current_stats_; -} +PacedSender::Stats PacedSender::GetStats() const { return current_stats_; } /*----------------------------------------------------------------------------*/ -int PacketSenderImp::EnqueueRtpPackets( +int PacedSender::EnqueueRtpPackets( std::vector> &rtp_packets, int64_t captured_timestamp_us) { std::vector> to_send_rtp_packets; @@ -306,13 +301,13 @@ int PacketSenderImp::EnqueueRtpPackets( return 0; } -int PacketSenderImp::EnqueueRtpPackets( +int PacedSender::EnqueueRtpPackets( std::vector> &rtp_packets) { EnqueuePackets(std::move(rtp_packets)); return 0; } -int PacketSenderImp::EnqueueRtpPacket( +int PacedSender::EnqueueRtpPacket( std::unique_ptr rtp_packet) { EnqueuePacket(std::move(rtp_packet)); return 0; diff --git a/src/transport/packet_sender/packet_sender_imp.h b/src/transport/paced_sender/paced_sender.h similarity index 89% rename from src/transport/packet_sender/packet_sender_imp.h rename to src/transport/paced_sender/paced_sender.h index cb1e97b..27f58ee 100644 --- a/src/transport/packet_sender/packet_sender_imp.h +++ b/src/transport/paced_sender/paced_sender.h @@ -4,8 +4,8 @@ * Copyright (c) 2025 by DI JUNKUN, All Rights Reserved. */ -#ifndef _PACKET_SENDER_IMP_H_ -#define _PACKET_SENDER_IMP_H_ +#ifndef _PACED_SENDER__H_ +#define _PACED_SENDER__H_ #include @@ -18,34 +18,31 @@ #include "ice_agent.h" #include "log.h" #include "pacing_controller.h" -#include "packet_sender.h" #include "rtc_base/numerics/exp_filter.h" #include "rtp_packet_pacer.h" #include "rtp_packet_to_send.h" #include "task_queue.h" -class PacketSenderImp : public PacketSender, - public webrtc::RtpPacketPacer, - public webrtc::PacingController::PacketSender { +class PacedSender : public webrtc::RtpPacketPacer, + public webrtc::PacingController::PacketSender { public: static const int kNoPacketHoldback; - PacketSenderImp(std::shared_ptr ice_agent, - std::shared_ptr clock, - std::shared_ptr task_queue); - ~PacketSenderImp(); + PacedSender(std::shared_ptr ice_agent, + std::shared_ptr clock, + std::shared_ptr task_queue); + ~PacedSender(); public: - int Send() override { return 0; } + int Send() { return 0; } int EnqueueRtpPackets(std::vector>& rtp_packets, - int64_t captured_timestamp_us) override; + int64_t captured_timestamp_us); - int EnqueueRtpPackets(std::vector>& - rtp_packets) override; + int EnqueueRtpPackets( + std::vector>& rtp_packets); - int EnqueueRtpPacket( - std::unique_ptr rtp_packet) override; + int EnqueueRtpPacket(std::unique_ptr rtp_packet); public: void SetOnSentPacketFunc( @@ -106,7 +103,7 @@ class PacketSenderImp : public PacketSender, // Methods implementing RtpPacketSender. // Adds the packet to the queue and calls - // PacingController::PacketSenderImp::SendPacket() when it's time to send. + // PacingController::PacedSender::SendPacket() when it's time to send. void EnqueuePackets( std::vector> packets); void EnqueuePacket(std::unique_ptr packet); diff --git a/src/transport/packet_sender/packet_sender.h b/src/transport/packet_sender/packet_sender.h deleted file mode 100644 index 91ad1dd..0000000 --- a/src/transport/packet_sender/packet_sender.h +++ /dev/null @@ -1,33 +0,0 @@ -/* - * @Author: DI JUNKUN - * @Date: 2025-03-17 - * Copyright (c) 2025 by DI JUNKUN, All Rights Reserved. - */ - -#ifndef _PACKET_SENDER_H_ -#define _PACKET_SENDER_H_ - -#include -#include - -#include "rtp_packet.h" -#include "rtp_packet_to_send.h" - -class PacketSender { - public: - PacketSender() {} - virtual ~PacketSender() {} - - virtual int Send() = 0; - virtual int EnqueueRtpPackets( - std::vector>& rtp_packets, - int64_t captured_timestamp_us) = 0; - - virtual int EnqueueRtpPackets( - std::vector>& rtp_packets) = 0; - - virtual int EnqueueRtpPacket( - std::unique_ptr rtp_packet) = 0; -}; - -#endif \ No newline at end of file diff --git a/xmake.lua b/xmake.lua index efaaef4..733b570 100644 --- a/xmake.lua +++ b/xmake.lua @@ -142,10 +142,10 @@ target("transport") add_deps("log", "ws", "ice", "rtp", "rtcp", "statistics", "media", "qos") add_files("src/transport/*.cpp", "src/transport/channel/*.cpp", - "src/transport/packet_sender/*.cpp") + "src/transport/paced_sender/*.cpp") add_includedirs("src/transport", "src/transport/channel", - "src/transport/packet_sender", {public = true}) + "src/transport/paced_sender", {public = true}) target("media") set_kind("object")