mirror of
https://github.com/kunkundi/crossdesk.git
synced 2025-10-27 04:35:34 +08:00
[feat] move rtp packet sender out of channel module
This commit is contained in:
@@ -110,6 +110,9 @@ void IceTransport::OnIceStateChange(NiceAgent *agent, guint stream_id,
|
||||
if (state == NICE_COMPONENT_STATE_READY ||
|
||||
state == NICE_COMPONENT_STATE_CONNECTED) {
|
||||
ice_io_statistics_->Start();
|
||||
if (ice_transport_controller_) {
|
||||
ice_transport_controller_->UpdateNetworkAvaliablity(true);
|
||||
}
|
||||
}
|
||||
|
||||
on_ice_status_change_(nice_component_state_to_string(state),
|
||||
@@ -202,6 +205,8 @@ void IceTransport::OnReceiveBuffer(NiceAgent *agent, guint stream_id,
|
||||
// LOG_ERROR("Rtcp packet [{}]", (uint8_t)(buffer[1]));
|
||||
RtcpPacketInfo rtcp_packet_info;
|
||||
ParseRtcpPacket((const uint8_t *)buffer, size, &rtcp_packet_info);
|
||||
} else if (CheckIsRtpPaddingPacket(buffer, size)) {
|
||||
// LOG_WARN("Rtp padding packet");
|
||||
} else {
|
||||
LOG_ERROR("Unknown packet");
|
||||
}
|
||||
@@ -920,7 +925,22 @@ uint8_t IceTransport::CheckIsRtpPacket(const char *buffer, size_t size) {
|
||||
}
|
||||
|
||||
uint8_t payload_type = buffer[1] & 0x7F;
|
||||
if (payload_type >= 96 && payload_type <= 127) {
|
||||
if (payload_type == 96 || payload_type == 99 || payload_type == 111 ||
|
||||
payload_type == 127) {
|
||||
return payload_type;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
uint8_t IceTransport::CheckIsRtpPaddingPacket(const char *buffer, size_t size) {
|
||||
if (size < 2) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint8_t payload_type = buffer[1] & 0x7F;
|
||||
if (payload_type == 95 || payload_type == 98 || payload_type == 110 ||
|
||||
payload_type == 126) {
|
||||
return payload_type;
|
||||
} else {
|
||||
return 0;
|
||||
|
||||
@@ -105,6 +105,7 @@ class IceTransport {
|
||||
|
||||
private:
|
||||
uint8_t CheckIsRtpPacket(const char *buffer, size_t size);
|
||||
uint8_t CheckIsRtpPaddingPacket(const char *buffer, size_t size);
|
||||
uint8_t CheckIsRtcpPacket(const char *buffer, size_t size);
|
||||
uint8_t CheckIsVideoPacket(const char *buffer, size_t size);
|
||||
uint8_t CheckIsAudioPacket(const char *buffer, size_t size);
|
||||
|
||||
@@ -42,6 +42,7 @@ void IceTransportController::Create(
|
||||
std::shared_ptr<IOStatistics> ice_io_statistics,
|
||||
OnReceiveVideo on_receive_video, OnReceiveAudio on_receive_audio,
|
||||
OnReceiveData on_receive_data, void* user_data) {
|
||||
ice_agent_ = ice_agent;
|
||||
remote_user_id_ = remote_user_id;
|
||||
on_receive_video_ = on_receive_video;
|
||||
on_receive_audio_ = on_receive_audio;
|
||||
@@ -53,6 +54,16 @@ void IceTransportController::Create(
|
||||
|
||||
controller_ = std::make_unique<CongestionControl>();
|
||||
packet_sender_ = std::make_unique<PacketSender>(ice_agent, webrtc_clock_);
|
||||
packet_sender_->SetPacingRates(DataRate::BitsPerSec(300000),
|
||||
DataRate::Zero());
|
||||
packet_sender_->SetOnSentPacketFunc(
|
||||
[this](const webrtc::RtpPacketToSend& packet) {
|
||||
if (ice_agent_) {
|
||||
ice_agent_->Send((const char*)packet.Buffer().data(), packet.Size());
|
||||
OnSentRtpPacket(packet);
|
||||
}
|
||||
});
|
||||
|
||||
resolution_adapter_ = std::make_unique<ResolutionAdapter>();
|
||||
|
||||
video_channel_send_ = std::make_unique<VideoChannelSend>(
|
||||
@@ -60,6 +71,13 @@ void IceTransportController::Create(
|
||||
[this](const webrtc::RtpPacketToSend& packet) {
|
||||
OnSentRtpPacket(packet);
|
||||
});
|
||||
|
||||
packet_sender_->SetGeneratePaddingFunc(
|
||||
[this](uint32_t size, int64_t capture_timestamp_ms)
|
||||
-> std::vector<std::unique_ptr<RtpPacket>> {
|
||||
return video_channel_send_->GeneratePadding(size, capture_timestamp_ms);
|
||||
});
|
||||
|
||||
audio_channel_send_ =
|
||||
std::make_unique<AudioChannelSend>(ice_agent, ice_io_statistics);
|
||||
data_channel_send_ =
|
||||
@@ -69,6 +87,10 @@ void IceTransportController::Create(
|
||||
audio_channel_send_->Initialize(rtp::PAYLOAD_TYPE::OPUS);
|
||||
data_channel_send_->Initialize(rtp::PAYLOAD_TYPE::DATA);
|
||||
|
||||
video_channel_send_->SetEnqueuePacketsFunc(
|
||||
[this](std::vector<std::unique_ptr<webrtc::RtpPacketToSend>>& packets)
|
||||
-> void { packet_sender_->EnqueuePackets(std::move(packets)); });
|
||||
|
||||
std::weak_ptr<IceTransportController> weak_self = shared_from_this();
|
||||
video_channel_receive_ = std::make_unique<VideoChannelReceive>(
|
||||
clock_, ice_agent, ice_io_statistics,
|
||||
@@ -161,6 +183,7 @@ int IceTransportController::SendVideo(const XVideoFrame* video_frame) {
|
||||
[this](std::shared_ptr<VideoFrameWrapper> encoded_frame) -> int {
|
||||
if (video_channel_send_) {
|
||||
video_channel_send_->SendVideo(encoded_frame);
|
||||
LOG_WARN("SendVideo rtp packets");
|
||||
}
|
||||
|
||||
return 0;
|
||||
@@ -205,6 +228,17 @@ int IceTransportController::SendData(const char* data, size_t size) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
void IceTransportController::UpdateNetworkAvaliablity(bool network_available) {
|
||||
if (controller_) {
|
||||
webrtc::NetworkAvailability msg;
|
||||
msg.at_time =
|
||||
webrtc::Timestamp::Millis(webrtc_clock_->TimeInMilliseconds());
|
||||
msg.network_available = network_available;
|
||||
controller_->OnNetworkAvailability(msg);
|
||||
packet_sender_->EnsureStarted();
|
||||
}
|
||||
}
|
||||
|
||||
int IceTransportController::OnReceiveVideoRtpPacket(const char* data,
|
||||
size_t size) {
|
||||
if (video_channel_receive_) {
|
||||
@@ -486,7 +520,7 @@ void IceTransportController::PostUpdates(webrtc::NetworkControlUpdate update) {
|
||||
target_height_.reset();
|
||||
}
|
||||
video_encoder_->SetTargetBitrate(target_bitrate_);
|
||||
LOG_WARN("Set target bitrate [{}]bps", target_bitrate_);
|
||||
// LOG_WARN("Set target bitrate [{}]bps", target_bitrate_);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -58,6 +58,8 @@ class IceTransportController
|
||||
|
||||
void FullIntraRequest() { b_force_i_frame_ = true; }
|
||||
|
||||
void UpdateNetworkAvaliablity(bool network_available);
|
||||
|
||||
int OnReceiveVideoRtpPacket(const char *data, size_t size);
|
||||
int OnReceiveAudioRtpPacket(const char *data, size_t size);
|
||||
int OnReceiveDataRtpPacket(const char *data, size_t size);
|
||||
|
||||
@@ -3,116 +3,244 @@
|
||||
|
||||
#include "log.h"
|
||||
|
||||
const int PacketSender::kNoPacketHoldback = -1;
|
||||
|
||||
PacketSender::PacketSender(std::shared_ptr<IceAgent> ice_agent,
|
||||
std::shared_ptr<webrtc::Clock> clock)
|
||||
: ice_agent_(ice_agent),
|
||||
clock_(clock),
|
||||
pacing_controller_(clock.get(), this) {}
|
||||
pacing_controller_(clock.get(), this),
|
||||
max_hold_back_window_(webrtc::TimeDelta::Millis(5)),
|
||||
max_hold_back_window_in_packets_(3),
|
||||
next_process_time_(webrtc::Timestamp::MinusInfinity()),
|
||||
is_started_(false),
|
||||
is_shutdown_(false),
|
||||
packet_size_(/*alpha=*/0.95),
|
||||
include_overhead_(false) {}
|
||||
|
||||
PacketSender::~PacketSender() {}
|
||||
|
||||
// int PacketSender::SendPacket(const char *data, size_t size) {
|
||||
// LOG_INFO("Send packet, size: %d", size);
|
||||
// return ice_agent_->Send(data, size);
|
||||
// }
|
||||
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>>
|
||||
PacketSender::GeneratePadding(webrtc::DataSize size) {
|
||||
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> to_send_rtp_packets;
|
||||
std::vector<std::unique_ptr<RtpPacket>> rtp_packets =
|
||||
generat_padding_func_(size.bytes(), clock_->CurrentTime().ms());
|
||||
// for (auto &packet : rtp_packets) {
|
||||
// std::unique_ptr<webrtc::RtpPacketToSend> rtp_packet_to_send(
|
||||
// static_cast<webrtc::RtpPacketToSend *>(packet.release()));
|
||||
// to_send_rtp_packets.push_back(std::move(rtp_packet_to_send));
|
||||
// }
|
||||
|
||||
// void PacketSender::CreateProbeClusters(
|
||||
// std::vector<webrtc::ProbeClusterConfig> probe_cluster_configs) {
|
||||
// pacing_controller_.CreateProbeClusters(probe_cluster_configs);
|
||||
// MaybeScheduleProcessPackets();
|
||||
// }
|
||||
return to_send_rtp_packets;
|
||||
}
|
||||
|
||||
// void PacketSender::MaybeScheduleProcessPackets() {
|
||||
// if (!processing_packets_)
|
||||
// MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
|
||||
// }
|
||||
void PacketSender::SetSendBurstInterval(webrtc::TimeDelta burst_interval) {
|
||||
pacing_controller_.SetSendBurstInterval(burst_interval);
|
||||
}
|
||||
|
||||
// void PacketSender::MaybeProcessPackets(
|
||||
// webrtc::Timestamp scheduled_process_time) {
|
||||
// if (is_shutdown_ || !is_started_) {
|
||||
// return;
|
||||
// }
|
||||
void PacketSender::SetAllowProbeWithoutMediaPacket(bool allow) {
|
||||
pacing_controller_.SetAllowProbeWithoutMediaPacket(allow);
|
||||
}
|
||||
|
||||
// // Protects against re-entry from transport feedback calling into the task
|
||||
// // queue pacer.
|
||||
// processing_packets_ = true;
|
||||
// auto cleanup = std::unique_ptr<void, std::function<void(void *)>>(
|
||||
// nullptr, [this](void *) { processing_packets_ = false; });
|
||||
void PacketSender::EnsureStarted() {
|
||||
is_started_ = true;
|
||||
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
|
||||
}
|
||||
|
||||
// webrtc::Timestamp next_send_time = pacing_controller_.NextSendTime();
|
||||
// const webrtc::Timestamp now = clock_->CurrentTime();
|
||||
// webrtc::TimeDelta early_execute_margin =
|
||||
// pacing_controller_.IsProbing()
|
||||
// ? webrtc::PacingController::kMaxEarlyProbeProcessing
|
||||
// : webrtc::TimeDelta::Zero();
|
||||
void PacketSender::Pause() { pacing_controller_.Pause(); }
|
||||
|
||||
// // Process packets and update stats.
|
||||
// while (next_send_time <= now + early_execute_margin) {
|
||||
// pacing_controller_.ProcessPackets();
|
||||
// next_send_time = pacing_controller_.NextSendTime();
|
||||
void PacketSender::Resume() {
|
||||
pacing_controller_.Resume();
|
||||
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
|
||||
}
|
||||
|
||||
// // Probing state could change. Get margin after process packets.
|
||||
// early_execute_margin =
|
||||
// pacing_controller_.IsProbing()
|
||||
// ? webrtc::PacingController::kMaxEarlyProbeProcessing
|
||||
// : webrtc::TimeDelta::Zero();
|
||||
// }
|
||||
// UpdateStats();
|
||||
void PacketSender::SetCongested(bool congested) {
|
||||
pacing_controller_.SetCongested(congested);
|
||||
MaybeScheduleProcessPackets();
|
||||
}
|
||||
|
||||
// // Ignore retired scheduled task, otherwise reset `next_process_time_`.
|
||||
// if (scheduled_process_time.IsFinite()) {
|
||||
// if (scheduled_process_time != next_process_time_) {
|
||||
// return;
|
||||
// }
|
||||
// next_process_time_ = webrtc::Timestamp::MinusInfinity();
|
||||
// }
|
||||
void PacketSender::SetPacingRates(webrtc::DataRate pacing_rate,
|
||||
webrtc::DataRate padding_rate) {
|
||||
pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
|
||||
MaybeScheduleProcessPackets();
|
||||
}
|
||||
|
||||
// // Do not hold back in probing.
|
||||
// webrtc::TimeDelta hold_back_window = webrtc::TimeDelta::Zero();
|
||||
// if (!pacing_controller_.IsProbing()) {
|
||||
// hold_back_window = max_hold_back_window_;
|
||||
// webrtc::DataRate pacing_rate = pacing_controller_.pacing_rate();
|
||||
// if (max_hold_back_window_in_packets_ != kNoPacketHoldback &&
|
||||
// !pacing_rate.IsZero() &&
|
||||
// packet_size_.filtered() != rtc::ExpFilter::kValueUndefined) {
|
||||
// webrtc::TimeDelta avg_packet_send_time =
|
||||
// webrtc::DataSize::Bytes(packet_size_.filtered()) / pacing_rate;
|
||||
// hold_back_window =
|
||||
// std::min(hold_back_window,
|
||||
// avg_packet_send_time * max_hold_back_window_in_packets_);
|
||||
// }
|
||||
// }
|
||||
void PacketSender::EnqueuePackets(
|
||||
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> packets) {
|
||||
// task_queue_->PostTask()
|
||||
for (auto &packet : packets) {
|
||||
size_t packet_size = packet->payload_size() + packet->padding_size();
|
||||
if (include_overhead_) {
|
||||
packet_size += packet->headers_size();
|
||||
}
|
||||
packet_size_.Apply(1, packet_size);
|
||||
pacing_controller_.EnqueuePacket(std::move(packet));
|
||||
}
|
||||
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
|
||||
}
|
||||
|
||||
// // Calculate next process time.
|
||||
// webrtc::TimeDelta time_to_next_process =
|
||||
// std::max(hold_back_window, next_send_time - now -
|
||||
// early_execute_margin);
|
||||
// next_send_time = now + time_to_next_process;
|
||||
void PacketSender::RemovePacketsForSsrc(uint32_t ssrc) {
|
||||
// task_queue_->PostTask(SafeTask(safety_.flag(), [this, ssrc] {
|
||||
pacing_controller_.RemovePacketsForSsrc(ssrc);
|
||||
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
|
||||
// }));
|
||||
}
|
||||
|
||||
// // If no in flight task or in flight task is later than `next_send_time`,
|
||||
// // schedule a new one. Previous in flight task will be retired.
|
||||
// if (next_process_time_.IsMinusInfinity() ||
|
||||
// next_process_time_ > next_send_time) {
|
||||
// // Prefer low precision if allowed and not probing.
|
||||
// task_queue_->PostDelayedHighPrecisionTask(
|
||||
// SafeTask(
|
||||
// safety_.flag(),
|
||||
// [this, next_send_time]() { MaybeProcessPackets(next_send_time);
|
||||
// }),
|
||||
// time_to_next_process.RoundUpTo(webrtc::TimeDelta::Millis(1)));
|
||||
// next_process_time_ = next_send_time;
|
||||
// }
|
||||
// }
|
||||
void PacketSender::SetAccountForAudioPackets(bool account_for_audio) {
|
||||
pacing_controller_.SetAccountForAudioPackets(account_for_audio);
|
||||
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
|
||||
}
|
||||
|
||||
// void PacketSender::UpdateStats() {
|
||||
// Stats new_stats;
|
||||
// new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime();
|
||||
// new_stats.first_sent_packet_time =
|
||||
// pacing_controller_.FirstSentPacketTime();
|
||||
// new_stats.oldest_packet_enqueue_time =
|
||||
// pacing_controller_.OldestPacketEnqueueTime();
|
||||
// new_stats.queue_size = pacing_controller_.QueueSizeData();
|
||||
// OnStatsUpdated(new_stats);
|
||||
// }
|
||||
void PacketSender::SetIncludeOverhead() {
|
||||
include_overhead_ = true;
|
||||
pacing_controller_.SetIncludeOverhead();
|
||||
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
|
||||
}
|
||||
|
||||
// PacketSender::Stats PacketSender::GetStats() const { return current_stats_; }
|
||||
void PacketSender::SetTransportOverhead(webrtc::DataSize overhead_per_packet) {
|
||||
pacing_controller_.SetTransportOverhead(overhead_per_packet);
|
||||
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
|
||||
}
|
||||
|
||||
void PacketSender::SetQueueTimeLimit(webrtc::TimeDelta limit) {
|
||||
pacing_controller_.SetQueueTimeLimit(limit);
|
||||
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
|
||||
}
|
||||
|
||||
webrtc::TimeDelta PacketSender::ExpectedQueueTime() const {
|
||||
return GetStats().expected_queue_time;
|
||||
}
|
||||
|
||||
webrtc::DataSize PacketSender::QueueSizeData() const {
|
||||
return GetStats().queue_size;
|
||||
}
|
||||
|
||||
std::optional<webrtc::Timestamp> PacketSender::FirstSentPacketTime() const {
|
||||
return GetStats().first_sent_packet_time;
|
||||
}
|
||||
|
||||
webrtc::TimeDelta PacketSender::OldestPacketWaitTime() const {
|
||||
webrtc::Timestamp oldest_packet = GetStats().oldest_packet_enqueue_time;
|
||||
if (oldest_packet.IsInfinite()) {
|
||||
return webrtc::TimeDelta::Zero();
|
||||
}
|
||||
|
||||
// (webrtc:9716): The clock is not always monotonic.
|
||||
webrtc::Timestamp current = clock_->CurrentTime();
|
||||
if (current < oldest_packet) {
|
||||
return webrtc::TimeDelta::Zero();
|
||||
}
|
||||
|
||||
return current - oldest_packet;
|
||||
}
|
||||
|
||||
void PacketSender::CreateProbeClusters(
|
||||
std::vector<webrtc::ProbeClusterConfig> probe_cluster_configs) {
|
||||
pacing_controller_.CreateProbeClusters(probe_cluster_configs);
|
||||
MaybeScheduleProcessPackets();
|
||||
}
|
||||
|
||||
void PacketSender::OnStatsUpdated(const Stats &stats) {
|
||||
current_stats_ = stats;
|
||||
}
|
||||
|
||||
void PacketSender::MaybeScheduleProcessPackets() {
|
||||
LOG_ERROR("x1");
|
||||
if (!processing_packets_) {
|
||||
LOG_ERROR("x2");
|
||||
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
|
||||
}
|
||||
}
|
||||
|
||||
void PacketSender::MaybeProcessPackets(
|
||||
webrtc::Timestamp scheduled_process_time) {
|
||||
if (is_shutdown_ || !is_started_) {
|
||||
LOG_ERROR("shutdown {}, started {}", is_shutdown_, is_started_);
|
||||
return;
|
||||
}
|
||||
|
||||
// Protects against re-entry from transport feedback calling into the task
|
||||
// queue pacer.
|
||||
processing_packets_ = true;
|
||||
// auto cleanup = std::unique_ptr<void, std::function<void(void *)>>(
|
||||
// nullptr, [this](void *) { processing_packets_ = false; });
|
||||
|
||||
webrtc::Timestamp next_send_time = pacing_controller_.NextSendTime();
|
||||
const webrtc::Timestamp now = clock_->CurrentTime();
|
||||
webrtc::TimeDelta early_execute_margin =
|
||||
pacing_controller_.IsProbing()
|
||||
? webrtc::PacingController::kMaxEarlyProbeProcessing
|
||||
: webrtc::TimeDelta::Zero();
|
||||
|
||||
// Process packets and update stats.
|
||||
while (next_send_time <= now + early_execute_margin) {
|
||||
pacing_controller_.ProcessPackets();
|
||||
next_send_time = pacing_controller_.NextSendTime();
|
||||
|
||||
// Probing state could change. Get margin after process packets.
|
||||
early_execute_margin =
|
||||
pacing_controller_.IsProbing()
|
||||
? webrtc::PacingController::kMaxEarlyProbeProcessing
|
||||
: webrtc::TimeDelta::Zero();
|
||||
}
|
||||
|
||||
UpdateStats();
|
||||
|
||||
// Ignore retired scheduled task, otherwise reset `next_process_time_`.
|
||||
if (scheduled_process_time.IsFinite()) {
|
||||
if (scheduled_process_time != next_process_time_) {
|
||||
return;
|
||||
}
|
||||
next_process_time_ = webrtc::Timestamp::MinusInfinity();
|
||||
}
|
||||
|
||||
// Do not hold back in probing.
|
||||
webrtc::TimeDelta hold_back_window = webrtc::TimeDelta::Zero();
|
||||
if (!pacing_controller_.IsProbing()) {
|
||||
hold_back_window = max_hold_back_window_;
|
||||
webrtc::DataRate pacing_rate = pacing_controller_.pacing_rate();
|
||||
if (max_hold_back_window_in_packets_ != kNoPacketHoldback &&
|
||||
!pacing_rate.IsZero() &&
|
||||
packet_size_.filtered() != rtc::ExpFilter::kValueUndefined) {
|
||||
webrtc::TimeDelta avg_packet_send_time =
|
||||
webrtc::DataSize::Bytes(packet_size_.filtered()) / pacing_rate;
|
||||
hold_back_window =
|
||||
std::min(hold_back_window,
|
||||
avg_packet_send_time * max_hold_back_window_in_packets_);
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate next process time.
|
||||
webrtc::TimeDelta time_to_next_process =
|
||||
std::max(hold_back_window, next_send_time - now - early_execute_margin);
|
||||
next_send_time = now + time_to_next_process;
|
||||
|
||||
// If no in flight task or in flight task is later than `next_send_time`,
|
||||
// schedule a new one. Previous in flight task will be retired.
|
||||
if (next_process_time_.IsMinusInfinity() ||
|
||||
next_process_time_ > next_send_time) {
|
||||
// Prefer low precision if allowed and not probing.
|
||||
// task_queue_->PostDelayedHighPrecisionTask(
|
||||
// SafeTask(
|
||||
// safety_.flag(),
|
||||
// [this, next_send_time]() { MaybeProcessPackets(next_send_time);
|
||||
// }),
|
||||
MaybeProcessPackets(next_send_time);
|
||||
time_to_next_process.RoundUpTo(webrtc::TimeDelta::Millis(1));
|
||||
next_process_time_ = next_send_time;
|
||||
}
|
||||
|
||||
processing_packets_ = false;
|
||||
}
|
||||
|
||||
void PacketSender::UpdateStats() {
|
||||
Stats new_stats;
|
||||
new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime();
|
||||
new_stats.first_sent_packet_time = pacing_controller_.FirstSentPacketTime();
|
||||
new_stats.oldest_packet_enqueue_time =
|
||||
pacing_controller_.OldestPacketEnqueueTime();
|
||||
new_stats.queue_size = pacing_controller_.QueueSizeData();
|
||||
OnStatsUpdated(new_stats);
|
||||
}
|
||||
|
||||
PacketSender::Stats PacketSender::GetStats() const { return current_stats_; }
|
||||
@@ -16,6 +16,7 @@
|
||||
#include "api/units/time_delta.h"
|
||||
#include "api/units/timestamp.h"
|
||||
#include "ice_agent.h"
|
||||
#include "log.h"
|
||||
#include "pacing_controller.h"
|
||||
#include "rtc_base/numerics/exp_filter.h"
|
||||
#include "rtp_packet_pacer.h"
|
||||
@@ -24,74 +25,38 @@
|
||||
class PacketSender : public webrtc::RtpPacketPacer,
|
||||
public webrtc::PacingController::PacketSender {
|
||||
public:
|
||||
static const int kNoPacketHoldback;
|
||||
|
||||
PacketSender(std::shared_ptr<IceAgent> ice_agent,
|
||||
std::shared_ptr<webrtc::Clock> clock);
|
||||
~PacketSender();
|
||||
|
||||
int SendPacket(const char* data, size_t size);
|
||||
|
||||
public:
|
||||
void CreateProbeClusters(
|
||||
std::vector<webrtc::ProbeClusterConfig> probe_cluster_configs) override{};
|
||||
|
||||
// Temporarily pause all sending.
|
||||
void Pause() override{};
|
||||
|
||||
// Resume sending packets.
|
||||
void Resume() override{};
|
||||
|
||||
void SetCongested(bool congested) override{};
|
||||
|
||||
// Sets the pacing rates. Must be called once before packets can be sent.
|
||||
void SetPacingRates(webrtc::DataRate pacing_rate,
|
||||
webrtc::DataRate padding_rate) override{};
|
||||
|
||||
// Time since the oldest packet currently in the queue was added.
|
||||
webrtc::TimeDelta OldestPacketWaitTime() const override {
|
||||
return webrtc::TimeDelta::Zero();
|
||||
};
|
||||
|
||||
// Sum of payload + padding bytes of all packets currently in the pacer queue.
|
||||
webrtc::DataSize QueueSizeData() const override {
|
||||
return webrtc::DataSize::Zero();
|
||||
};
|
||||
|
||||
// Returns the time when the first packet was sent.
|
||||
std::optional<webrtc::Timestamp> FirstSentPacketTime() const override {
|
||||
return {};
|
||||
void SetOnSentPacketFunc(
|
||||
std::function<void(const webrtc::RtpPacketToSend&)> on_sent_packet_func) {
|
||||
on_sent_packet_func_ = on_sent_packet_func;
|
||||
}
|
||||
|
||||
// Returns the expected number of milliseconds it will take to send the
|
||||
// current packets in the queue, given the current size and bitrate, ignoring
|
||||
// priority.
|
||||
webrtc::TimeDelta ExpectedQueueTime() const override {
|
||||
return webrtc::TimeDelta::Zero();
|
||||
};
|
||||
|
||||
// Set the average upper bound on pacer queuing delay. The pacer may send at
|
||||
// a higher rate than what was configured via SetPacingRates() in order to
|
||||
// keep ExpectedQueueTimeMs() below `limit_ms` on average.
|
||||
void SetQueueTimeLimit(webrtc::TimeDelta limit) override{};
|
||||
|
||||
// Currently audio traffic is not accounted by pacer and passed through.
|
||||
// With the introduction of audio BWE audio traffic will be accounted for
|
||||
// the pacer budget calculation. The audio traffic still will be injected
|
||||
// at high priority.
|
||||
void SetAccountForAudioPackets(bool account_for_audio) override{};
|
||||
void SetIncludeOverhead() override{};
|
||||
void SetTransportOverhead(webrtc::DataSize overhead_per_packet) override{};
|
||||
void SetGeneratePaddingFunc(
|
||||
std::function<std::vector<std::unique_ptr<RtpPacket>>(uint32_t, int64_t)>
|
||||
generat_padding_func) {
|
||||
generat_padding_func_ = generat_padding_func;
|
||||
}
|
||||
|
||||
public:
|
||||
void SendPacket(std::unique_ptr<webrtc::RtpPacketToSend> packet,
|
||||
const webrtc::PacedPacketInfo& cluster_info) override {}
|
||||
const webrtc::PacedPacketInfo& cluster_info) override {
|
||||
if (on_sent_packet_func_) {
|
||||
on_sent_packet_func_(*packet);
|
||||
}
|
||||
}
|
||||
// Should be called after each call to SendPacket().
|
||||
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> FetchFec() override {
|
||||
return {};
|
||||
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> fec_packets;
|
||||
return fec_packets;
|
||||
}
|
||||
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> GeneratePadding(
|
||||
webrtc::DataSize size) override {
|
||||
return {};
|
||||
}
|
||||
webrtc::DataSize size) override;
|
||||
|
||||
// TODO(bugs.webrtc.org/1439830): Make pure once subclasses adapt.
|
||||
void OnBatchComplete() override {}
|
||||
|
||||
@@ -105,12 +70,133 @@ class PacketSender : public webrtc::RtpPacketPacer,
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
public:
|
||||
void SetSendBurstInterval(webrtc::TimeDelta burst_interval);
|
||||
|
||||
// A probe may be sent without first waing for a media packet.
|
||||
void SetAllowProbeWithoutMediaPacket(bool allow);
|
||||
|
||||
// Ensure that necessary delayed tasks are scheduled.
|
||||
void EnsureStarted();
|
||||
|
||||
// Methods implementing RtpPacketSender.
|
||||
|
||||
// Adds the packet to the queue and calls
|
||||
// PacingController::PacketSender::SendPacket() when it's time to send.
|
||||
void EnqueuePackets(
|
||||
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> packets);
|
||||
// Remove any pending packets matching this SSRC from the packet queue.
|
||||
void RemovePacketsForSsrc(uint32_t ssrc);
|
||||
|
||||
void CreateProbeClusters(
|
||||
std::vector<webrtc::ProbeClusterConfig> probe_cluster_configs) override;
|
||||
|
||||
// Temporarily pause all sending.
|
||||
void Pause() override;
|
||||
|
||||
// Resume sending packets.
|
||||
void Resume() override;
|
||||
|
||||
void SetCongested(bool congested) override;
|
||||
|
||||
// Sets the pacing rates. Must be called once before packets can be sent.
|
||||
void SetPacingRates(webrtc::DataRate pacing_rate,
|
||||
webrtc::DataRate padding_rate) override;
|
||||
|
||||
// Currently audio traffic is not accounted for by pacer and passed through.
|
||||
// With the introduction of audio BWE, audio traffic will be accounted for
|
||||
// in the pacer budget calculation. The audio traffic will still be injected
|
||||
// at high priority.
|
||||
void SetAccountForAudioPackets(bool account_for_audio) override;
|
||||
|
||||
void SetIncludeOverhead() override;
|
||||
void SetTransportOverhead(webrtc::DataSize overhead_per_packet) override;
|
||||
|
||||
// Time since the oldest packet currently in the queue was added.
|
||||
webrtc::TimeDelta OldestPacketWaitTime() const override;
|
||||
|
||||
// Sum of payload + padding bytes of all packets currently in the pacer queue.
|
||||
webrtc::DataSize QueueSizeData() const override;
|
||||
|
||||
// Returns the time when the first packet was sent.
|
||||
std::optional<webrtc::Timestamp> FirstSentPacketTime() const override;
|
||||
|
||||
// Returns the expected number of milliseconds it will take to send the
|
||||
// current packets in the queue, given the current size and bitrate, ignoring
|
||||
// priority.
|
||||
webrtc::TimeDelta ExpectedQueueTime() const override;
|
||||
|
||||
// Set the average upper bound on pacer queuing delay. The pacer may send at
|
||||
// a higher rate than what was configured via SetPacingRates() in order to
|
||||
// keep ExpectedQueueTimeMs() below `limit_ms` on average.
|
||||
void SetQueueTimeLimit(webrtc::TimeDelta limit) override;
|
||||
|
||||
protected:
|
||||
// Exposed as protected for test.
|
||||
struct Stats {
|
||||
Stats()
|
||||
: oldest_packet_enqueue_time(webrtc::Timestamp::MinusInfinity()),
|
||||
queue_size(webrtc::DataSize::Zero()),
|
||||
expected_queue_time(webrtc::TimeDelta::Zero()) {}
|
||||
webrtc::Timestamp oldest_packet_enqueue_time;
|
||||
webrtc::DataSize queue_size;
|
||||
webrtc::TimeDelta expected_queue_time;
|
||||
std::optional<webrtc::Timestamp> first_sent_packet_time;
|
||||
};
|
||||
void OnStatsUpdated(const Stats& stats);
|
||||
|
||||
private:
|
||||
// Call in response to state updates that could warrant sending out packets.
|
||||
// Protected against re-entry from packet sent receipts.
|
||||
void MaybeScheduleProcessPackets();
|
||||
// Check if it is time to send packets, or schedule a delayed task if not.
|
||||
// Use Timestamp::MinusInfinity() to indicate that this call has _not_
|
||||
// been scheduled by the pacing controller. If this is the case, check if we
|
||||
// can execute immediately otherwise schedule a delay task that calls this
|
||||
// method again with desired (finite) scheduled process time.
|
||||
void MaybeProcessPackets(webrtc::Timestamp scheduled_process_time);
|
||||
|
||||
void UpdateStats();
|
||||
Stats GetStats() const;
|
||||
|
||||
private:
|
||||
std::shared_ptr<IceAgent> ice_agent_ = nullptr;
|
||||
webrtc::PacingController pacing_controller_;
|
||||
std::function<void(const webrtc::RtpPacketToSend&)> on_sent_packet_func_ =
|
||||
nullptr;
|
||||
|
||||
std::function<std::vector<std::unique_ptr<RtpPacket>>(uint32_t, int64_t)>
|
||||
generat_padding_func_ = nullptr;
|
||||
|
||||
private:
|
||||
std::shared_ptr<webrtc::Clock> clock_ = nullptr;
|
||||
|
||||
private:
|
||||
const webrtc::TimeDelta max_hold_back_window_;
|
||||
const int max_hold_back_window_in_packets_;
|
||||
// We want only one (valid) delayed process task in flight at a time.
|
||||
// If the value of `next_process_time_` is finite, it is an id for a
|
||||
// delayed task that will call MaybeProcessPackets() with that time
|
||||
// as parameter.
|
||||
// Timestamp::MinusInfinity() indicates no valid pending task.
|
||||
webrtc::Timestamp next_process_time_;
|
||||
|
||||
// Indicates if this task queue is started. If not, don't allow
|
||||
// posting delayed tasks yet.
|
||||
bool is_started_;
|
||||
|
||||
// Indicates if this task queue is shutting down. If so, don't allow
|
||||
// posting any more delayed tasks as that can cause the task queue to
|
||||
// never drain.
|
||||
bool is_shutdown_;
|
||||
|
||||
// Filtered size of enqueued packets, in bytes.
|
||||
rtc::ExpFilter packet_size_;
|
||||
bool include_overhead_;
|
||||
|
||||
Stats current_stats_;
|
||||
// Protects against ProcessPackets reentry from packet sent receipts.
|
||||
bool processing_packets_ = false;
|
||||
};
|
||||
|
||||
#endif
|
||||
Reference in New Issue
Block a user