[refactor] move channel module into transport module

This commit is contained in:
dijunkun
2025-03-17 17:19:46 +08:00
parent 5c598be51d
commit b0306d510c
35 changed files with 134 additions and 98 deletions

View File

@@ -365,7 +365,7 @@ int IceAgent::Send(const char *data, size_t size) {
}
if (destroyed_) {
LOG_ERROR("Nice agent is destroyed");
// LOG_ERROR("Nice agent is destroyed");
return -1;
}

View File

@@ -60,6 +60,7 @@ void BitrateProber::MaybeSetActiveState(DataSize packet_size) {
if (ReadyToSetActiveState(packet_size)) {
next_probe_time_ = Timestamp::MinusInfinity();
probing_state_ = ProbingState::kActive;
LOG_WARN("Probing set to active");
}
}
@@ -69,6 +70,7 @@ bool BitrateProber::ReadyToSetActiveState(DataSize packet_size) const {
}
switch (probing_state_) {
case ProbingState::kDisabled:
return false;
case ProbingState::kActive:
return false;
case ProbingState::kInactive:
@@ -91,6 +93,7 @@ void BitrateProber::OnIncomingPacket(DataSize packet_size) {
void BitrateProber::CreateProbeCluster(
const ProbeClusterConfig& cluster_config) {
LOG_WARN("a1");
while (!clusters_.empty() &&
(cluster_config.at_time - clusters_.front().requested_at >
kProbeClusterTimeout ||
@@ -109,7 +112,7 @@ void BitrateProber::CreateProbeCluster(
cluster.pace_info.send_bitrate = cluster_config.target_data_rate;
cluster.pace_info.probe_cluster_id = cluster_config.id;
clusters_.push(cluster);
LOG_WARN("a1 clusters size = {}", clusters_.size());
MaybeSetActiveState(/*packet_size=*/DataSize::Zero());
LOG_INFO("Probe cluster (bitrate_bps:min bytes:min packets): ({}:{}:{}, {})",

View File

@@ -92,7 +92,9 @@ PacingController::~PacingController() = default;
void PacingController::CreateProbeClusters(
rtc::ArrayView<const ProbeClusterConfig> probe_cluster_configs) {
LOG_WARN("b0");
for (const ProbeClusterConfig probe_cluster_config : probe_cluster_configs) {
LOG_WARN("b1");
prober_.CreateProbeCluster(probe_cluster_config);
}
}

View File

@@ -531,7 +531,6 @@ std::vector<ProbeClusterConfig> ProbeController::InitiateProbing(
}
pending_probes.push_back(CreateProbeClusterConfig(now, bitrate));
}
LOG_ERROR("2 pending probes size {}", pending_probes.size());
time_last_probing_initiated_ = now;
if (probe_further) {
UpdateState(State::kWaitingForProbingResult);

View File

@@ -88,7 +88,7 @@ RtpVideoReceiver::~RtpVideoReceiver() {
#endif
}
void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) {
void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet, bool padding) {
if (!rtp_statistics_) {
rtp_statistics_ = std::make_unique<RtpStatistics>();
rtp_statistics_->Start();
@@ -194,6 +194,10 @@ void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) {
// // SendRtcpRR(rtcp_rr);
// }
if (padding) {
return;
}
if (rtp_packet.PayloadType() == rtp::PAYLOAD_TYPE::AV1) {
RtpPacketAv1 rtp_packet_av1;
rtp_packet_av1.Build(rtp_packet.Buffer().data(), rtp_packet.Size());

View File

@@ -36,7 +36,7 @@ class RtpVideoReceiver : public ThreadBase,
virtual ~RtpVideoReceiver();
public:
void InsertRtpPacket(RtpPacket& rtp_packet);
void InsertRtpPacket(RtpPacket& rtp_packet, bool padding);
void SetSendDataFunc(std::function<int(const char*, size_t)> data_send_func);

View File

@@ -50,7 +50,8 @@ void VideoChannelReceive::Destroy() {
}
}
int VideoChannelReceive::OnReceiveRtpPacket(const char *data, size_t size) {
int VideoChannelReceive::OnReceiveRtpPacket(const char *data, size_t size,
bool padding) {
if (ice_io_statistics_) {
ice_io_statistics_->UpdateVideoInboundBytes((uint32_t)size);
}
@@ -58,7 +59,7 @@ int VideoChannelReceive::OnReceiveRtpPacket(const char *data, size_t size) {
if (rtp_video_receiver_) {
RtpPacket rtp_packet;
rtp_packet.Build((uint8_t *)data, (uint32_t)size);
rtp_video_receiver_->InsertRtpPacket(rtp_packet);
rtp_video_receiver_->InsertRtpPacket(rtp_packet, padding);
}
return 0;

View File

@@ -39,7 +39,7 @@ class VideoChannelReceive {
return 0;
}
int OnReceiveRtpPacket(const char *data, size_t size);
int OnReceiveRtpPacket(const char *data, size_t size, bool padding);
void OnSenderReport(const SenderReport &sender_report) {
if (rtp_video_receiver_) {

View File

@@ -194,7 +194,7 @@ void IceTransport::OnReceiveBuffer(NiceAgent *agent, guint stream_id,
if (!is_closed_) {
if (CheckIsRtpPacket(buffer, size)) {
if (CheckIsVideoPacket(buffer, size) && ice_transport_controller_) {
ice_transport_controller_->OnReceiveVideoRtpPacket(buffer, size);
ice_transport_controller_->OnReceiveVideoRtpPacket(buffer, size, false);
} else if (CheckIsAudioPacket(buffer, size) &&
ice_transport_controller_) {
ice_transport_controller_->OnReceiveAudioRtpPacket(buffer, size);
@@ -206,7 +206,7 @@ void IceTransport::OnReceiveBuffer(NiceAgent *agent, guint stream_id,
RtcpPacketInfo rtcp_packet_info;
ParseRtcpPacket((const uint8_t *)buffer, size, &rtcp_packet_info);
} else if (CheckIsRtpPaddingPacket(buffer, size)) {
// LOG_WARN("Rtp padding packet");
ice_transport_controller_->OnReceiveVideoRtpPacket(buffer, size, true);
} else {
LOG_ERROR("Unknown packet");
}

View File

@@ -53,7 +53,7 @@ void IceTransportController::Create(
CreateAudioCodec();
controller_ = std::make_unique<CongestionControl>();
packet_sender_ = std::make_unique<PacketSender>(ice_agent, webrtc_clock_);
packet_sender_ = std::make_unique<PacketSenderImp>(ice_agent, webrtc_clock_);
packet_sender_->SetPacingRates(DataRate::BitsPerSec(300000),
DataRate::Zero());
packet_sender_->SetOnSentPacketFunc(
@@ -239,9 +239,9 @@ void IceTransportController::UpdateNetworkAvaliablity(bool network_available) {
}
int IceTransportController::OnReceiveVideoRtpPacket(const char* data,
size_t size) {
size_t size, bool padding) {
if (video_channel_receive_) {
return video_channel_receive_->OnReceiveRtpPacket(data, size);
return video_channel_receive_->OnReceiveRtpPacket(data, size, padding);
}
return -1;

View File

@@ -21,7 +21,7 @@
#include "data_channel_receive.h"
#include "data_channel_send.h"
#include "ice_agent.h"
#include "packet_sender.h"
#include "packet_sender_imp.h"
#include "resolution_adapter.h"
#include "transport_feedback_adapter.h"
#include "video_channel_receive.h"
@@ -60,7 +60,7 @@ class IceTransportController
void UpdateNetworkAvaliablity(bool network_available);
int OnReceiveVideoRtpPacket(const char *data, size_t size);
int OnReceiveVideoRtpPacket(const char *data, size_t size, bool padding);
int OnReceiveAudioRtpPacket(const char *data, size_t size);
int OnReceiveDataRtpPacket(const char *data, size_t size);
@@ -107,7 +107,7 @@ class IceTransportController
std::shared_ptr<IceAgent> ice_agent_ = nullptr;
std::shared_ptr<IOStatistics> ice_io_statistics_ = nullptr;
std::unique_ptr<RtpPacketizer> rtp_packetizer_ = nullptr;
std::unique_ptr<PacketSender> packet_sender_ = nullptr;
std::unique_ptr<PacketSenderImp> packet_sender_ = nullptr;
std::string remote_user_id_;
void *user_data_ = nullptr;

View File

@@ -0,0 +1,25 @@
/*
* @Author: DI JUNKUN
* @Date: 2025-03-17
* Copyright (c) 2025 by DI JUNKUN, All Rights Reserved.
*/
#ifndef _PACKET_SENDER_H_
#define _PACKET_SENDER_H_
#include <memory>
#include <vector>
#include "rtp_packet.h"
class PacketSender {
public:
PacketSender() {}
virtual ~PacketSender() {}
virtual int Send() = 0;
virtual int InsertRtpPacket(
std::vector<std::unique_ptr<RtpPacket>> &rtp_packets) = 0;
};
#endif

View File

@@ -1,12 +1,12 @@
#include "packet_sender.h"
#include "packet_sender_imp.h"
#include "log.h"
const int PacketSender::kNoPacketHoldback = -1;
const int PacketSenderImp::kNoPacketHoldback = -1;
PacketSender::PacketSender(std::shared_ptr<IceAgent> ice_agent,
std::shared_ptr<webrtc::Clock> clock)
PacketSenderImp::PacketSenderImp(std::shared_ptr<IceAgent> ice_agent,
std::shared_ptr<webrtc::Clock> clock)
: ice_agent_(ice_agent),
clock_(clock),
pacing_controller_(clock.get(), this),
@@ -18,10 +18,10 @@ PacketSender::PacketSender(std::shared_ptr<IceAgent> ice_agent,
packet_size_(/*alpha=*/0.95),
include_overhead_(false) {}
PacketSender::~PacketSender() {}
PacketSenderImp::~PacketSenderImp() {}
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>>
PacketSender::GeneratePadding(webrtc::DataSize size) {
PacketSenderImp::GeneratePadding(webrtc::DataSize size) {
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> to_send_rtp_packets;
std::vector<std::unique_ptr<RtpPacket>> rtp_packets =
generat_padding_func_(size.bytes(), clock_->CurrentTime().ms());
@@ -39,97 +39,105 @@ PacketSender::GeneratePadding(webrtc::DataSize size) {
return to_send_rtp_packets;
}
void PacketSender::SetSendBurstInterval(webrtc::TimeDelta burst_interval) {
void PacketSenderImp::SetSendBurstInterval(webrtc::TimeDelta burst_interval) {
pacing_controller_.SetSendBurstInterval(burst_interval);
}
void PacketSender::SetAllowProbeWithoutMediaPacket(bool allow) {
void PacketSenderImp::SetAllowProbeWithoutMediaPacket(bool allow) {
pacing_controller_.SetAllowProbeWithoutMediaPacket(allow);
}
void PacketSender::EnsureStarted() {
void PacketSenderImp::EnsureStarted() {
is_started_ = true;
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
}
void PacketSender::Pause() { pacing_controller_.Pause(); }
void PacketSenderImp::CreateProbeClusters(
std::vector<webrtc::ProbeClusterConfig> probe_cluster_configs) {
pacing_controller_.CreateProbeClusters(probe_cluster_configs);
MaybeScheduleProcessPackets();
}
void PacketSender::Resume() {
void PacketSenderImp::Pause() { pacing_controller_.Pause(); }
void PacketSenderImp::Resume() {
pacing_controller_.Resume();
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
}
void PacketSender::SetCongested(bool congested) {
void PacketSenderImp::SetCongested(bool congested) {
pacing_controller_.SetCongested(congested);
MaybeScheduleProcessPackets();
}
void PacketSender::SetPacingRates(webrtc::DataRate pacing_rate,
webrtc::DataRate padding_rate) {
void PacketSenderImp::SetPacingRates(webrtc::DataRate pacing_rate,
webrtc::DataRate padding_rate) {
pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
MaybeScheduleProcessPackets();
}
void PacketSender::EnqueuePackets(
void PacketSenderImp::EnqueuePackets(
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> packets) {
webrtc::PacedPacketInfo cluster_info;
for (auto &packet : packets) {
SendPacket(std::move(packet), cluster_info);
}
// task_queue_.PostTask([this, packets = std::move(packets)]() mutable {
// for (auto &packet : packets) {
// size_t packet_size = packet->payload_size() + packet->padding_size();
// if (include_overhead_) {
// packet_size += packet->headers_size();
// }
// packet_size_.Apply(1, packet_size);
// pacing_controller_.EnqueuePacket(std::move(packet));
// }
// MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
// });
task_queue_.PostTask([this, packets = std::move(packets)]() mutable {
for (auto &packet : packets) {
size_t packet_size = packet->payload_size() + packet->padding_size();
if (include_overhead_) {
packet_size += packet->headers_size();
}
packet_size_.Apply(1, packet_size);
pacing_controller_.EnqueuePacket(std::move(packet));
}
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
});
// webrtc::PacedPacketInfo cluster_info;
// for (auto &packet : packets) {
// SendPacket(std::move(packet), cluster_info);
// }
}
void PacketSender::RemovePacketsForSsrc(uint32_t ssrc) {
void PacketSenderImp::RemovePacketsForSsrc(uint32_t ssrc) {
task_queue_.PostTask([this, ssrc] {
pacing_controller_.RemovePacketsForSsrc(ssrc);
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
});
}
void PacketSender::SetAccountForAudioPackets(bool account_for_audio) {
void PacketSenderImp::SetAccountForAudioPackets(bool account_for_audio) {
pacing_controller_.SetAccountForAudioPackets(account_for_audio);
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
}
void PacketSender::SetIncludeOverhead() {
void PacketSenderImp::SetIncludeOverhead() {
include_overhead_ = true;
pacing_controller_.SetIncludeOverhead();
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
}
void PacketSender::SetTransportOverhead(webrtc::DataSize overhead_per_packet) {
void PacketSenderImp::SetTransportOverhead(
webrtc::DataSize overhead_per_packet) {
pacing_controller_.SetTransportOverhead(overhead_per_packet);
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
}
void PacketSender::SetQueueTimeLimit(webrtc::TimeDelta limit) {
void PacketSenderImp::SetQueueTimeLimit(webrtc::TimeDelta limit) {
pacing_controller_.SetQueueTimeLimit(limit);
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
}
webrtc::TimeDelta PacketSender::ExpectedQueueTime() const {
webrtc::TimeDelta PacketSenderImp::ExpectedQueueTime() const {
return GetStats().expected_queue_time;
}
webrtc::DataSize PacketSender::QueueSizeData() const {
webrtc::DataSize PacketSenderImp::QueueSizeData() const {
return GetStats().queue_size;
}
std::optional<webrtc::Timestamp> PacketSender::FirstSentPacketTime() const {
std::optional<webrtc::Timestamp> PacketSenderImp::FirstSentPacketTime() const {
return GetStats().first_sent_packet_time;
}
webrtc::TimeDelta PacketSender::OldestPacketWaitTime() const {
webrtc::TimeDelta PacketSenderImp::OldestPacketWaitTime() const {
webrtc::Timestamp oldest_packet = GetStats().oldest_packet_enqueue_time;
if (oldest_packet.IsInfinite()) {
return webrtc::TimeDelta::Zero();
@@ -144,17 +152,11 @@ webrtc::TimeDelta PacketSender::OldestPacketWaitTime() const {
return current - oldest_packet;
}
void PacketSender::CreateProbeClusters(
std::vector<webrtc::ProbeClusterConfig> probe_cluster_configs) {
pacing_controller_.CreateProbeClusters(probe_cluster_configs);
MaybeScheduleProcessPackets();
}
void PacketSender::OnStatsUpdated(const Stats &stats) {
void PacketSenderImp::OnStatsUpdated(const Stats &stats) {
current_stats_ = stats;
}
void PacketSender::MaybeScheduleProcessPackets() {
void PacketSenderImp::MaybeScheduleProcessPackets() {
LOG_ERROR("x1");
if (!processing_packets_) {
LOG_ERROR("x2");
@@ -162,7 +164,7 @@ void PacketSender::MaybeScheduleProcessPackets() {
}
}
void PacketSender::MaybeProcessPackets(
void PacketSenderImp::MaybeProcessPackets(
webrtc::Timestamp scheduled_process_time) {
if (is_shutdown_ || !is_started_) {
LOG_ERROR("shutdown {}, started {}", is_shutdown_, is_started_);
@@ -171,9 +173,8 @@ void PacketSender::MaybeProcessPackets(
// Protects against re-entry from transport feedback calling into the task
// queue pacer.
processing_packets_ = true;
// auto cleanup = std::unique_ptr<void, std::function<void(void *)>>(
// nullptr, [this](void *) { processing_packets_ = false; });
auto cleanup = std::unique_ptr<void, std::function<void(void *)>>(
nullptr, [this](void *) { processing_packets_ = false; });
webrtc::Timestamp next_send_time = pacing_controller_.NextSendTime();
const webrtc::Timestamp now = clock_->CurrentTime();
@@ -230,20 +231,14 @@ void PacketSender::MaybeProcessPackets(
if (next_process_time_.IsMinusInfinity() ||
next_process_time_ > next_send_time) {
// Prefer low precision if allowed and not probing.
// task_queue_->PostDelayedHighPrecisionTask(
// SafeTask(
// safety_.flag(),
// [this, next_send_time]() { MaybeProcessPackets(next_send_time);
// }),
MaybeProcessPackets(next_send_time);
time_to_next_process.RoundUpTo(webrtc::TimeDelta::Millis(1));
task_queue_.PostDelayedTask(
[this, next_send_time]() { MaybeProcessPackets(next_send_time); },
time_to_next_process.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms());
next_process_time_ = next_send_time;
}
processing_packets_ = false;
}
void PacketSender::UpdateStats() {
void PacketSenderImp::UpdateStats() {
Stats new_stats;
new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime();
new_stats.first_sent_packet_time = pacing_controller_.FirstSentPacketTime();
@@ -253,4 +248,6 @@ void PacketSender::UpdateStats() {
OnStatsUpdated(new_stats);
}
PacketSender::Stats PacketSender::GetStats() const { return current_stats_; }
PacketSenderImp::Stats PacketSenderImp::GetStats() const {
return current_stats_;
}

View File

@@ -4,8 +4,8 @@
* Copyright (c) 2025 by DI JUNKUN, All Rights Reserved.
*/
#ifndef _PACKET_SENDER_H_
#define _PACKET_SENDER_H_
#ifndef _PACKET_SENDER_IMP_H_
#define _PACKET_SENDER_IMP_H_
#include <memory>
@@ -18,19 +18,28 @@
#include "ice_agent.h"
#include "log.h"
#include "pacing_controller.h"
#include "packet_sender.h"
#include "rtc_base/numerics/exp_filter.h"
#include "rtp_packet_pacer.h"
#include "rtp_packet_to_send.h"
#include "task_queue.h"
class PacketSender : public webrtc::RtpPacketPacer,
public webrtc::PacingController::PacketSender {
class PacketSenderImp : public PacketSender,
public webrtc::RtpPacketPacer,
public webrtc::PacingController::PacketSender {
public:
static const int kNoPacketHoldback;
PacketSender(std::shared_ptr<IceAgent> ice_agent,
std::shared_ptr<webrtc::Clock> clock);
~PacketSender();
PacketSenderImp(std::shared_ptr<IceAgent> ice_agent,
std::shared_ptr<webrtc::Clock> clock);
~PacketSenderImp();
public:
int Send() { return 0; }
int InsertRtpPacket(std::vector<std::unique_ptr<RtpPacket>>& rtp_packets) {
return 0;
}
void SetOnSentPacketFunc(
std::function<void(const webrtc::RtpPacketToSend&)> on_sent_packet_func) {
@@ -58,7 +67,7 @@ class PacketSender : public webrtc::RtpPacketPacer,
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> GeneratePadding(
webrtc::DataSize size) override;
// TODO(bugs.webrtc.org/1439830): Make pure once subclasses adapt.
// TODO(bugs.webrtc.org/1439830): Make pure once subclasses adapt.
void OnBatchComplete() override {}
// TODO(bugs.webrtc.org/11340): Make pure once downstream projects
@@ -83,7 +92,7 @@ class PacketSender : public webrtc::RtpPacketPacer,
// Methods implementing RtpPacketSender.
// Adds the packet to the queue and calls
// PacingController::PacketSender::SendPacket() when it's time to send.
// PacingController::PacketSenderImp::SendPacket() when it's time to send.
void EnqueuePackets(
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> packets);
// Remove any pending packets matching this SSRC from the packet queue.