From b0306d510c991d872972a33f82f82e79bc9565a3 Mon Sep 17 00:00:00 2001 From: dijunkun Date: Mon, 17 Mar 2025 17:19:46 +0800 Subject: [PATCH] [refactor] move channel module into transport module --- src/ice/ice_agent.cpp | 2 +- src/qos/bitrate_prober.cc | 5 +- src/qos/pacing_controller.cc | 2 + src/qos/probe_controller.cc | 1 - .../channel}/audio_channel_receive.cpp | 0 .../channel}/audio_channel_receive.h | 0 .../channel}/audio_channel_send.cpp | 0 .../channel}/audio_channel_send.h | 0 .../channel}/data_channel_receive.cpp | 0 .../channel}/data_channel_receive.h | 0 .../channel}/data_channel_send.cpp | 0 .../channel}/data_channel_send.h | 0 .../channel}/rtp_audio_receiver.cpp | 0 .../channel}/rtp_audio_receiver.h | 0 .../channel}/rtp_audio_sender.cpp | 0 .../channel}/rtp_audio_sender.h | 0 .../channel}/rtp_data_receiver.cpp | 0 .../channel}/rtp_data_receiver.h | 0 .../channel}/rtp_data_sender.cpp | 0 .../channel}/rtp_data_sender.h | 0 .../channel}/rtp_video_receiver.cpp | 6 +- .../channel}/rtp_video_receiver.h | 2 +- .../channel}/rtp_video_sender.cpp | 0 .../channel}/rtp_video_sender.h | 0 .../channel}/video_channel_receive.cpp | 5 +- .../channel}/video_channel_receive.h | 2 +- .../channel}/video_channel_send.cpp | 0 .../channel}/video_channel_send.h | 0 src/transport/ice_transport.cpp | 4 +- src/transport/ice_transport_controller.cpp | 6 +- src/transport/ice_transport_controller.h | 6 +- src/transport/packet_sender/packet_sender.h | 25 ++++ .../packet_sender_imp.cpp} | 121 +++++++++--------- .../packet_sender_imp.h} | 27 ++-- xmake.lua | 18 +-- 35 files changed, 134 insertions(+), 98 deletions(-) rename src/{channel/meida_channel => transport/channel}/audio_channel_receive.cpp (100%) rename src/{channel/meida_channel => transport/channel}/audio_channel_receive.h (100%) rename src/{channel/meida_channel => transport/channel}/audio_channel_send.cpp (100%) rename src/{channel/meida_channel => transport/channel}/audio_channel_send.h (100%) rename src/{channel/meida_channel => transport/channel}/data_channel_receive.cpp (100%) rename src/{channel/meida_channel => transport/channel}/data_channel_receive.h (100%) rename src/{channel/meida_channel => transport/channel}/data_channel_send.cpp (100%) rename src/{channel/meida_channel => transport/channel}/data_channel_send.h (100%) rename src/{channel/rtp_channel => transport/channel}/rtp_audio_receiver.cpp (100%) rename src/{channel/rtp_channel => transport/channel}/rtp_audio_receiver.h (100%) rename src/{channel/rtp_channel => transport/channel}/rtp_audio_sender.cpp (100%) rename src/{channel/rtp_channel => transport/channel}/rtp_audio_sender.h (100%) rename src/{channel/rtp_channel => transport/channel}/rtp_data_receiver.cpp (100%) rename src/{channel/rtp_channel => transport/channel}/rtp_data_receiver.h (100%) rename src/{channel/rtp_channel => transport/channel}/rtp_data_sender.cpp (100%) rename src/{channel/rtp_channel => transport/channel}/rtp_data_sender.h (100%) rename src/{channel/rtp_channel => transport/channel}/rtp_video_receiver.cpp (99%) rename src/{channel/rtp_channel => transport/channel}/rtp_video_receiver.h (98%) rename src/{channel/rtp_channel => transport/channel}/rtp_video_sender.cpp (100%) rename src/{channel/rtp_channel => transport/channel}/rtp_video_sender.h (100%) rename src/{channel/meida_channel => transport/channel}/video_channel_receive.cpp (93%) rename src/{channel/meida_channel => transport/channel}/video_channel_receive.h (94%) rename src/{channel/meida_channel => transport/channel}/video_channel_send.cpp (100%) rename src/{channel/meida_channel => transport/channel}/video_channel_send.h (100%) create mode 100644 src/transport/packet_sender/packet_sender.h rename src/transport/{packet_sender.cpp => packet_sender/packet_sender_imp.cpp} (68%) rename src/transport/{packet_sender.h => packet_sender/packet_sender_imp.h} (90%) diff --git a/src/ice/ice_agent.cpp b/src/ice/ice_agent.cpp index 71bf22e..bd4c961 100644 --- a/src/ice/ice_agent.cpp +++ b/src/ice/ice_agent.cpp @@ -365,7 +365,7 @@ int IceAgent::Send(const char *data, size_t size) { } if (destroyed_) { - LOG_ERROR("Nice agent is destroyed"); + // LOG_ERROR("Nice agent is destroyed"); return -1; } diff --git a/src/qos/bitrate_prober.cc b/src/qos/bitrate_prober.cc index 53c7607..531befa 100644 --- a/src/qos/bitrate_prober.cc +++ b/src/qos/bitrate_prober.cc @@ -60,6 +60,7 @@ void BitrateProber::MaybeSetActiveState(DataSize packet_size) { if (ReadyToSetActiveState(packet_size)) { next_probe_time_ = Timestamp::MinusInfinity(); probing_state_ = ProbingState::kActive; + LOG_WARN("Probing set to active"); } } @@ -69,6 +70,7 @@ bool BitrateProber::ReadyToSetActiveState(DataSize packet_size) const { } switch (probing_state_) { case ProbingState::kDisabled: + return false; case ProbingState::kActive: return false; case ProbingState::kInactive: @@ -91,6 +93,7 @@ void BitrateProber::OnIncomingPacket(DataSize packet_size) { void BitrateProber::CreateProbeCluster( const ProbeClusterConfig& cluster_config) { + LOG_WARN("a1"); while (!clusters_.empty() && (cluster_config.at_time - clusters_.front().requested_at > kProbeClusterTimeout || @@ -109,7 +112,7 @@ void BitrateProber::CreateProbeCluster( cluster.pace_info.send_bitrate = cluster_config.target_data_rate; cluster.pace_info.probe_cluster_id = cluster_config.id; clusters_.push(cluster); - + LOG_WARN("a1 clusters size = {}", clusters_.size()); MaybeSetActiveState(/*packet_size=*/DataSize::Zero()); LOG_INFO("Probe cluster (bitrate_bps:min bytes:min packets): ({}:{}:{}, {})", diff --git a/src/qos/pacing_controller.cc b/src/qos/pacing_controller.cc index 4785b0e..5272b6a 100644 --- a/src/qos/pacing_controller.cc +++ b/src/qos/pacing_controller.cc @@ -92,7 +92,9 @@ PacingController::~PacingController() = default; void PacingController::CreateProbeClusters( rtc::ArrayView probe_cluster_configs) { + LOG_WARN("b0"); for (const ProbeClusterConfig probe_cluster_config : probe_cluster_configs) { + LOG_WARN("b1"); prober_.CreateProbeCluster(probe_cluster_config); } } diff --git a/src/qos/probe_controller.cc b/src/qos/probe_controller.cc index af814c4..4229d04 100644 --- a/src/qos/probe_controller.cc +++ b/src/qos/probe_controller.cc @@ -531,7 +531,6 @@ std::vector ProbeController::InitiateProbing( } pending_probes.push_back(CreateProbeClusterConfig(now, bitrate)); } - LOG_ERROR("2 pending probes size {}", pending_probes.size()); time_last_probing_initiated_ = now; if (probe_further) { UpdateState(State::kWaitingForProbingResult); diff --git a/src/channel/meida_channel/audio_channel_receive.cpp b/src/transport/channel/audio_channel_receive.cpp similarity index 100% rename from src/channel/meida_channel/audio_channel_receive.cpp rename to src/transport/channel/audio_channel_receive.cpp diff --git a/src/channel/meida_channel/audio_channel_receive.h b/src/transport/channel/audio_channel_receive.h similarity index 100% rename from src/channel/meida_channel/audio_channel_receive.h rename to src/transport/channel/audio_channel_receive.h diff --git a/src/channel/meida_channel/audio_channel_send.cpp b/src/transport/channel/audio_channel_send.cpp similarity index 100% rename from src/channel/meida_channel/audio_channel_send.cpp rename to src/transport/channel/audio_channel_send.cpp diff --git a/src/channel/meida_channel/audio_channel_send.h b/src/transport/channel/audio_channel_send.h similarity index 100% rename from src/channel/meida_channel/audio_channel_send.h rename to src/transport/channel/audio_channel_send.h diff --git a/src/channel/meida_channel/data_channel_receive.cpp b/src/transport/channel/data_channel_receive.cpp similarity index 100% rename from src/channel/meida_channel/data_channel_receive.cpp rename to src/transport/channel/data_channel_receive.cpp diff --git a/src/channel/meida_channel/data_channel_receive.h b/src/transport/channel/data_channel_receive.h similarity index 100% rename from src/channel/meida_channel/data_channel_receive.h rename to src/transport/channel/data_channel_receive.h diff --git a/src/channel/meida_channel/data_channel_send.cpp b/src/transport/channel/data_channel_send.cpp similarity index 100% rename from src/channel/meida_channel/data_channel_send.cpp rename to src/transport/channel/data_channel_send.cpp diff --git a/src/channel/meida_channel/data_channel_send.h b/src/transport/channel/data_channel_send.h similarity index 100% rename from src/channel/meida_channel/data_channel_send.h rename to src/transport/channel/data_channel_send.h diff --git a/src/channel/rtp_channel/rtp_audio_receiver.cpp b/src/transport/channel/rtp_audio_receiver.cpp similarity index 100% rename from src/channel/rtp_channel/rtp_audio_receiver.cpp rename to src/transport/channel/rtp_audio_receiver.cpp diff --git a/src/channel/rtp_channel/rtp_audio_receiver.h b/src/transport/channel/rtp_audio_receiver.h similarity index 100% rename from src/channel/rtp_channel/rtp_audio_receiver.h rename to src/transport/channel/rtp_audio_receiver.h diff --git a/src/channel/rtp_channel/rtp_audio_sender.cpp b/src/transport/channel/rtp_audio_sender.cpp similarity index 100% rename from src/channel/rtp_channel/rtp_audio_sender.cpp rename to src/transport/channel/rtp_audio_sender.cpp diff --git a/src/channel/rtp_channel/rtp_audio_sender.h b/src/transport/channel/rtp_audio_sender.h similarity index 100% rename from src/channel/rtp_channel/rtp_audio_sender.h rename to src/transport/channel/rtp_audio_sender.h diff --git a/src/channel/rtp_channel/rtp_data_receiver.cpp b/src/transport/channel/rtp_data_receiver.cpp similarity index 100% rename from src/channel/rtp_channel/rtp_data_receiver.cpp rename to src/transport/channel/rtp_data_receiver.cpp diff --git a/src/channel/rtp_channel/rtp_data_receiver.h b/src/transport/channel/rtp_data_receiver.h similarity index 100% rename from src/channel/rtp_channel/rtp_data_receiver.h rename to src/transport/channel/rtp_data_receiver.h diff --git a/src/channel/rtp_channel/rtp_data_sender.cpp b/src/transport/channel/rtp_data_sender.cpp similarity index 100% rename from src/channel/rtp_channel/rtp_data_sender.cpp rename to src/transport/channel/rtp_data_sender.cpp diff --git a/src/channel/rtp_channel/rtp_data_sender.h b/src/transport/channel/rtp_data_sender.h similarity index 100% rename from src/channel/rtp_channel/rtp_data_sender.h rename to src/transport/channel/rtp_data_sender.h diff --git a/src/channel/rtp_channel/rtp_video_receiver.cpp b/src/transport/channel/rtp_video_receiver.cpp similarity index 99% rename from src/channel/rtp_channel/rtp_video_receiver.cpp rename to src/transport/channel/rtp_video_receiver.cpp index 3d8f20c..55a85cc 100644 --- a/src/channel/rtp_channel/rtp_video_receiver.cpp +++ b/src/transport/channel/rtp_video_receiver.cpp @@ -88,7 +88,7 @@ RtpVideoReceiver::~RtpVideoReceiver() { #endif } -void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) { +void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet, bool padding) { if (!rtp_statistics_) { rtp_statistics_ = std::make_unique(); rtp_statistics_->Start(); @@ -194,6 +194,10 @@ void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) { // // SendRtcpRR(rtcp_rr); // } + if (padding) { + return; + } + if (rtp_packet.PayloadType() == rtp::PAYLOAD_TYPE::AV1) { RtpPacketAv1 rtp_packet_av1; rtp_packet_av1.Build(rtp_packet.Buffer().data(), rtp_packet.Size()); diff --git a/src/channel/rtp_channel/rtp_video_receiver.h b/src/transport/channel/rtp_video_receiver.h similarity index 98% rename from src/channel/rtp_channel/rtp_video_receiver.h rename to src/transport/channel/rtp_video_receiver.h index 1d04e8b..4f7c3ce 100644 --- a/src/channel/rtp_channel/rtp_video_receiver.h +++ b/src/transport/channel/rtp_video_receiver.h @@ -36,7 +36,7 @@ class RtpVideoReceiver : public ThreadBase, virtual ~RtpVideoReceiver(); public: - void InsertRtpPacket(RtpPacket& rtp_packet); + void InsertRtpPacket(RtpPacket& rtp_packet, bool padding); void SetSendDataFunc(std::function data_send_func); diff --git a/src/channel/rtp_channel/rtp_video_sender.cpp b/src/transport/channel/rtp_video_sender.cpp similarity index 100% rename from src/channel/rtp_channel/rtp_video_sender.cpp rename to src/transport/channel/rtp_video_sender.cpp diff --git a/src/channel/rtp_channel/rtp_video_sender.h b/src/transport/channel/rtp_video_sender.h similarity index 100% rename from src/channel/rtp_channel/rtp_video_sender.h rename to src/transport/channel/rtp_video_sender.h diff --git a/src/channel/meida_channel/video_channel_receive.cpp b/src/transport/channel/video_channel_receive.cpp similarity index 93% rename from src/channel/meida_channel/video_channel_receive.cpp rename to src/transport/channel/video_channel_receive.cpp index 4cdca32..ce72af5 100644 --- a/src/channel/meida_channel/video_channel_receive.cpp +++ b/src/transport/channel/video_channel_receive.cpp @@ -50,7 +50,8 @@ void VideoChannelReceive::Destroy() { } } -int VideoChannelReceive::OnReceiveRtpPacket(const char *data, size_t size) { +int VideoChannelReceive::OnReceiveRtpPacket(const char *data, size_t size, + bool padding) { if (ice_io_statistics_) { ice_io_statistics_->UpdateVideoInboundBytes((uint32_t)size); } @@ -58,7 +59,7 @@ int VideoChannelReceive::OnReceiveRtpPacket(const char *data, size_t size) { if (rtp_video_receiver_) { RtpPacket rtp_packet; rtp_packet.Build((uint8_t *)data, (uint32_t)size); - rtp_video_receiver_->InsertRtpPacket(rtp_packet); + rtp_video_receiver_->InsertRtpPacket(rtp_packet, padding); } return 0; diff --git a/src/channel/meida_channel/video_channel_receive.h b/src/transport/channel/video_channel_receive.h similarity index 94% rename from src/channel/meida_channel/video_channel_receive.h rename to src/transport/channel/video_channel_receive.h index 3fbaa27..d7caffc 100644 --- a/src/channel/meida_channel/video_channel_receive.h +++ b/src/transport/channel/video_channel_receive.h @@ -39,7 +39,7 @@ class VideoChannelReceive { return 0; } - int OnReceiveRtpPacket(const char *data, size_t size); + int OnReceiveRtpPacket(const char *data, size_t size, bool padding); void OnSenderReport(const SenderReport &sender_report) { if (rtp_video_receiver_) { diff --git a/src/channel/meida_channel/video_channel_send.cpp b/src/transport/channel/video_channel_send.cpp similarity index 100% rename from src/channel/meida_channel/video_channel_send.cpp rename to src/transport/channel/video_channel_send.cpp diff --git a/src/channel/meida_channel/video_channel_send.h b/src/transport/channel/video_channel_send.h similarity index 100% rename from src/channel/meida_channel/video_channel_send.h rename to src/transport/channel/video_channel_send.h diff --git a/src/transport/ice_transport.cpp b/src/transport/ice_transport.cpp index 82f18f1..eb7d262 100644 --- a/src/transport/ice_transport.cpp +++ b/src/transport/ice_transport.cpp @@ -194,7 +194,7 @@ void IceTransport::OnReceiveBuffer(NiceAgent *agent, guint stream_id, if (!is_closed_) { if (CheckIsRtpPacket(buffer, size)) { if (CheckIsVideoPacket(buffer, size) && ice_transport_controller_) { - ice_transport_controller_->OnReceiveVideoRtpPacket(buffer, size); + ice_transport_controller_->OnReceiveVideoRtpPacket(buffer, size, false); } else if (CheckIsAudioPacket(buffer, size) && ice_transport_controller_) { ice_transport_controller_->OnReceiveAudioRtpPacket(buffer, size); @@ -206,7 +206,7 @@ void IceTransport::OnReceiveBuffer(NiceAgent *agent, guint stream_id, RtcpPacketInfo rtcp_packet_info; ParseRtcpPacket((const uint8_t *)buffer, size, &rtcp_packet_info); } else if (CheckIsRtpPaddingPacket(buffer, size)) { - // LOG_WARN("Rtp padding packet"); + ice_transport_controller_->OnReceiveVideoRtpPacket(buffer, size, true); } else { LOG_ERROR("Unknown packet"); } diff --git a/src/transport/ice_transport_controller.cpp b/src/transport/ice_transport_controller.cpp index c61ac50..f514282 100644 --- a/src/transport/ice_transport_controller.cpp +++ b/src/transport/ice_transport_controller.cpp @@ -53,7 +53,7 @@ void IceTransportController::Create( CreateAudioCodec(); controller_ = std::make_unique(); - packet_sender_ = std::make_unique(ice_agent, webrtc_clock_); + packet_sender_ = std::make_unique(ice_agent, webrtc_clock_); packet_sender_->SetPacingRates(DataRate::BitsPerSec(300000), DataRate::Zero()); packet_sender_->SetOnSentPacketFunc( @@ -239,9 +239,9 @@ void IceTransportController::UpdateNetworkAvaliablity(bool network_available) { } int IceTransportController::OnReceiveVideoRtpPacket(const char* data, - size_t size) { + size_t size, bool padding) { if (video_channel_receive_) { - return video_channel_receive_->OnReceiveRtpPacket(data, size); + return video_channel_receive_->OnReceiveRtpPacket(data, size, padding); } return -1; diff --git a/src/transport/ice_transport_controller.h b/src/transport/ice_transport_controller.h index 5318cd9..4b3941e 100644 --- a/src/transport/ice_transport_controller.h +++ b/src/transport/ice_transport_controller.h @@ -21,7 +21,7 @@ #include "data_channel_receive.h" #include "data_channel_send.h" #include "ice_agent.h" -#include "packet_sender.h" +#include "packet_sender_imp.h" #include "resolution_adapter.h" #include "transport_feedback_adapter.h" #include "video_channel_receive.h" @@ -60,7 +60,7 @@ class IceTransportController void UpdateNetworkAvaliablity(bool network_available); - int OnReceiveVideoRtpPacket(const char *data, size_t size); + int OnReceiveVideoRtpPacket(const char *data, size_t size, bool padding); int OnReceiveAudioRtpPacket(const char *data, size_t size); int OnReceiveDataRtpPacket(const char *data, size_t size); @@ -107,7 +107,7 @@ class IceTransportController std::shared_ptr ice_agent_ = nullptr; std::shared_ptr ice_io_statistics_ = nullptr; std::unique_ptr rtp_packetizer_ = nullptr; - std::unique_ptr packet_sender_ = nullptr; + std::unique_ptr packet_sender_ = nullptr; std::string remote_user_id_; void *user_data_ = nullptr; diff --git a/src/transport/packet_sender/packet_sender.h b/src/transport/packet_sender/packet_sender.h new file mode 100644 index 0000000..19e39c7 --- /dev/null +++ b/src/transport/packet_sender/packet_sender.h @@ -0,0 +1,25 @@ +/* + * @Author: DI JUNKUN + * @Date: 2025-03-17 + * Copyright (c) 2025 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _PACKET_SENDER_H_ +#define _PACKET_SENDER_H_ + +#include +#include + +#include "rtp_packet.h" + +class PacketSender { + public: + PacketSender() {} + virtual ~PacketSender() {} + + virtual int Send() = 0; + virtual int InsertRtpPacket( + std::vector> &rtp_packets) = 0; +}; + +#endif \ No newline at end of file diff --git a/src/transport/packet_sender.cpp b/src/transport/packet_sender/packet_sender_imp.cpp similarity index 68% rename from src/transport/packet_sender.cpp rename to src/transport/packet_sender/packet_sender_imp.cpp index 902fd7c..b8352bd 100644 --- a/src/transport/packet_sender.cpp +++ b/src/transport/packet_sender/packet_sender_imp.cpp @@ -1,12 +1,12 @@ -#include "packet_sender.h" +#include "packet_sender_imp.h" #include "log.h" -const int PacketSender::kNoPacketHoldback = -1; +const int PacketSenderImp::kNoPacketHoldback = -1; -PacketSender::PacketSender(std::shared_ptr ice_agent, - std::shared_ptr clock) +PacketSenderImp::PacketSenderImp(std::shared_ptr ice_agent, + std::shared_ptr clock) : ice_agent_(ice_agent), clock_(clock), pacing_controller_(clock.get(), this), @@ -18,10 +18,10 @@ PacketSender::PacketSender(std::shared_ptr ice_agent, packet_size_(/*alpha=*/0.95), include_overhead_(false) {} -PacketSender::~PacketSender() {} +PacketSenderImp::~PacketSenderImp() {} std::vector> -PacketSender::GeneratePadding(webrtc::DataSize size) { +PacketSenderImp::GeneratePadding(webrtc::DataSize size) { std::vector> to_send_rtp_packets; std::vector> rtp_packets = generat_padding_func_(size.bytes(), clock_->CurrentTime().ms()); @@ -39,97 +39,105 @@ PacketSender::GeneratePadding(webrtc::DataSize size) { return to_send_rtp_packets; } -void PacketSender::SetSendBurstInterval(webrtc::TimeDelta burst_interval) { +void PacketSenderImp::SetSendBurstInterval(webrtc::TimeDelta burst_interval) { pacing_controller_.SetSendBurstInterval(burst_interval); } -void PacketSender::SetAllowProbeWithoutMediaPacket(bool allow) { +void PacketSenderImp::SetAllowProbeWithoutMediaPacket(bool allow) { pacing_controller_.SetAllowProbeWithoutMediaPacket(allow); } -void PacketSender::EnsureStarted() { +void PacketSenderImp::EnsureStarted() { is_started_ = true; MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); } -void PacketSender::Pause() { pacing_controller_.Pause(); } +void PacketSenderImp::CreateProbeClusters( + std::vector probe_cluster_configs) { + pacing_controller_.CreateProbeClusters(probe_cluster_configs); + MaybeScheduleProcessPackets(); +} -void PacketSender::Resume() { +void PacketSenderImp::Pause() { pacing_controller_.Pause(); } + +void PacketSenderImp::Resume() { pacing_controller_.Resume(); MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); } -void PacketSender::SetCongested(bool congested) { +void PacketSenderImp::SetCongested(bool congested) { pacing_controller_.SetCongested(congested); MaybeScheduleProcessPackets(); } -void PacketSender::SetPacingRates(webrtc::DataRate pacing_rate, - webrtc::DataRate padding_rate) { +void PacketSenderImp::SetPacingRates(webrtc::DataRate pacing_rate, + webrtc::DataRate padding_rate) { pacing_controller_.SetPacingRates(pacing_rate, padding_rate); MaybeScheduleProcessPackets(); } -void PacketSender::EnqueuePackets( +void PacketSenderImp::EnqueuePackets( std::vector> packets) { - webrtc::PacedPacketInfo cluster_info; - for (auto &packet : packets) { - SendPacket(std::move(packet), cluster_info); - } - // task_queue_.PostTask([this, packets = std::move(packets)]() mutable { - // for (auto &packet : packets) { - // size_t packet_size = packet->payload_size() + packet->padding_size(); - // if (include_overhead_) { - // packet_size += packet->headers_size(); - // } - // packet_size_.Apply(1, packet_size); - // pacing_controller_.EnqueuePacket(std::move(packet)); - // } - // MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); - // }); + task_queue_.PostTask([this, packets = std::move(packets)]() mutable { + for (auto &packet : packets) { + size_t packet_size = packet->payload_size() + packet->padding_size(); + if (include_overhead_) { + packet_size += packet->headers_size(); + } + packet_size_.Apply(1, packet_size); + pacing_controller_.EnqueuePacket(std::move(packet)); + } + MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); + }); + + // webrtc::PacedPacketInfo cluster_info; + // for (auto &packet : packets) { + // SendPacket(std::move(packet), cluster_info); + // } } -void PacketSender::RemovePacketsForSsrc(uint32_t ssrc) { +void PacketSenderImp::RemovePacketsForSsrc(uint32_t ssrc) { task_queue_.PostTask([this, ssrc] { pacing_controller_.RemovePacketsForSsrc(ssrc); MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); }); } -void PacketSender::SetAccountForAudioPackets(bool account_for_audio) { +void PacketSenderImp::SetAccountForAudioPackets(bool account_for_audio) { pacing_controller_.SetAccountForAudioPackets(account_for_audio); MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); } -void PacketSender::SetIncludeOverhead() { +void PacketSenderImp::SetIncludeOverhead() { include_overhead_ = true; pacing_controller_.SetIncludeOverhead(); MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); } -void PacketSender::SetTransportOverhead(webrtc::DataSize overhead_per_packet) { +void PacketSenderImp::SetTransportOverhead( + webrtc::DataSize overhead_per_packet) { pacing_controller_.SetTransportOverhead(overhead_per_packet); MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); } -void PacketSender::SetQueueTimeLimit(webrtc::TimeDelta limit) { +void PacketSenderImp::SetQueueTimeLimit(webrtc::TimeDelta limit) { pacing_controller_.SetQueueTimeLimit(limit); MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); } -webrtc::TimeDelta PacketSender::ExpectedQueueTime() const { +webrtc::TimeDelta PacketSenderImp::ExpectedQueueTime() const { return GetStats().expected_queue_time; } -webrtc::DataSize PacketSender::QueueSizeData() const { +webrtc::DataSize PacketSenderImp::QueueSizeData() const { return GetStats().queue_size; } -std::optional PacketSender::FirstSentPacketTime() const { +std::optional PacketSenderImp::FirstSentPacketTime() const { return GetStats().first_sent_packet_time; } -webrtc::TimeDelta PacketSender::OldestPacketWaitTime() const { +webrtc::TimeDelta PacketSenderImp::OldestPacketWaitTime() const { webrtc::Timestamp oldest_packet = GetStats().oldest_packet_enqueue_time; if (oldest_packet.IsInfinite()) { return webrtc::TimeDelta::Zero(); @@ -144,17 +152,11 @@ webrtc::TimeDelta PacketSender::OldestPacketWaitTime() const { return current - oldest_packet; } -void PacketSender::CreateProbeClusters( - std::vector probe_cluster_configs) { - pacing_controller_.CreateProbeClusters(probe_cluster_configs); - MaybeScheduleProcessPackets(); -} - -void PacketSender::OnStatsUpdated(const Stats &stats) { +void PacketSenderImp::OnStatsUpdated(const Stats &stats) { current_stats_ = stats; } -void PacketSender::MaybeScheduleProcessPackets() { +void PacketSenderImp::MaybeScheduleProcessPackets() { LOG_ERROR("x1"); if (!processing_packets_) { LOG_ERROR("x2"); @@ -162,7 +164,7 @@ void PacketSender::MaybeScheduleProcessPackets() { } } -void PacketSender::MaybeProcessPackets( +void PacketSenderImp::MaybeProcessPackets( webrtc::Timestamp scheduled_process_time) { if (is_shutdown_ || !is_started_) { LOG_ERROR("shutdown {}, started {}", is_shutdown_, is_started_); @@ -171,9 +173,8 @@ void PacketSender::MaybeProcessPackets( // Protects against re-entry from transport feedback calling into the task // queue pacer. - processing_packets_ = true; - // auto cleanup = std::unique_ptr>( - // nullptr, [this](void *) { processing_packets_ = false; }); + auto cleanup = std::unique_ptr>( + nullptr, [this](void *) { processing_packets_ = false; }); webrtc::Timestamp next_send_time = pacing_controller_.NextSendTime(); const webrtc::Timestamp now = clock_->CurrentTime(); @@ -230,20 +231,14 @@ void PacketSender::MaybeProcessPackets( if (next_process_time_.IsMinusInfinity() || next_process_time_ > next_send_time) { // Prefer low precision if allowed and not probing. - // task_queue_->PostDelayedHighPrecisionTask( - // SafeTask( - // safety_.flag(), - // [this, next_send_time]() { MaybeProcessPackets(next_send_time); - // }), - MaybeProcessPackets(next_send_time); - time_to_next_process.RoundUpTo(webrtc::TimeDelta::Millis(1)); + task_queue_.PostDelayedTask( + [this, next_send_time]() { MaybeProcessPackets(next_send_time); }, + time_to_next_process.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms()); next_process_time_ = next_send_time; } - - processing_packets_ = false; } -void PacketSender::UpdateStats() { +void PacketSenderImp::UpdateStats() { Stats new_stats; new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime(); new_stats.first_sent_packet_time = pacing_controller_.FirstSentPacketTime(); @@ -253,4 +248,6 @@ void PacketSender::UpdateStats() { OnStatsUpdated(new_stats); } -PacketSender::Stats PacketSender::GetStats() const { return current_stats_; } \ No newline at end of file +PacketSenderImp::Stats PacketSenderImp::GetStats() const { + return current_stats_; +} \ No newline at end of file diff --git a/src/transport/packet_sender.h b/src/transport/packet_sender/packet_sender_imp.h similarity index 90% rename from src/transport/packet_sender.h rename to src/transport/packet_sender/packet_sender_imp.h index a3da1d6..c27cee8 100644 --- a/src/transport/packet_sender.h +++ b/src/transport/packet_sender/packet_sender_imp.h @@ -4,8 +4,8 @@ * Copyright (c) 2025 by DI JUNKUN, All Rights Reserved. */ -#ifndef _PACKET_SENDER_H_ -#define _PACKET_SENDER_H_ +#ifndef _PACKET_SENDER_IMP_H_ +#define _PACKET_SENDER_IMP_H_ #include @@ -18,19 +18,28 @@ #include "ice_agent.h" #include "log.h" #include "pacing_controller.h" +#include "packet_sender.h" #include "rtc_base/numerics/exp_filter.h" #include "rtp_packet_pacer.h" #include "rtp_packet_to_send.h" #include "task_queue.h" -class PacketSender : public webrtc::RtpPacketPacer, - public webrtc::PacingController::PacketSender { +class PacketSenderImp : public PacketSender, + public webrtc::RtpPacketPacer, + public webrtc::PacingController::PacketSender { public: static const int kNoPacketHoldback; - PacketSender(std::shared_ptr ice_agent, - std::shared_ptr clock); - ~PacketSender(); + PacketSenderImp(std::shared_ptr ice_agent, + std::shared_ptr clock); + ~PacketSenderImp(); + + public: + int Send() { return 0; } + + int InsertRtpPacket(std::vector>& rtp_packets) { + return 0; + } void SetOnSentPacketFunc( std::function on_sent_packet_func) { @@ -58,7 +67,7 @@ class PacketSender : public webrtc::RtpPacketPacer, std::vector> GeneratePadding( webrtc::DataSize size) override; - // TODO(bugs.webrtc.org/1439830): Make pure once subclasses adapt. + // TODO(bugs.webrtc.org/1439830): Make pure once subclasses adapt. void OnBatchComplete() override {} // TODO(bugs.webrtc.org/11340): Make pure once downstream projects @@ -83,7 +92,7 @@ class PacketSender : public webrtc::RtpPacketPacer, // Methods implementing RtpPacketSender. // Adds the packet to the queue and calls - // PacingController::PacketSender::SendPacket() when it's time to send. + // PacingController::PacketSenderImp::SendPacket() when it's time to send. void EnqueuePackets( std::vector> packets); // Remove any pending packets matching this SSRC from the packet queue. diff --git a/xmake.lua b/xmake.lua index 99d42f9..bbd3029 100644 --- a/xmake.lua +++ b/xmake.lua @@ -138,19 +138,15 @@ target("qos") "src/qos/*.cpp") add_includedirs("src/qos", {public = true}) -target("channel") - set_kind("object") - add_deps("log", "rtp", "rtcp", "ice", "qos") - add_files("src/channel/meida_channel/*.cpp", - "src/channel/rtp_channel/*.cpp") - add_includedirs("src/channel/meida_channel", - "src/channel/rtp_channel", {public = true}) - target("transport") set_kind("object") - add_deps("log", "ws", "ice", "channel", "rtp", "rtcp", "statistics", "media", "qos") - add_files("src/transport/*.cpp") - add_includedirs("src/transport", {public = true}) + add_deps("log", "ws", "ice", "rtp", "rtcp", "statistics", "media", "qos") + add_files("src/transport/*.cpp", + "src/transport/channel/*.cpp", + "src/transport/packet_sender/*.cpp") + add_includedirs("src/transport", + "src/transport/channel", + "src/transport/packet_sender", {public = true}) target("media") set_kind("object")