[fix] use task queue to process rtp packet history

This commit is contained in:
dijunkun
2025-04-03 16:54:28 +08:00
parent a36b352039
commit 772db42494
13 changed files with 152 additions and 196 deletions

View File

@@ -0,0 +1,314 @@
#include "paced_sender.h"
#include "log.h"
const int PacedSender::kNoPacketHoldback = -1;
PacedSender::PacedSender(std::shared_ptr<IceAgent> ice_agent,
std::shared_ptr<webrtc::Clock> clock,
std::shared_ptr<TaskQueue> task_queue)
: ice_agent_(ice_agent),
clock_(clock),
pacing_controller_(clock.get(), this),
max_hold_back_window_(webrtc::TimeDelta::Millis(5)),
max_hold_back_window_in_packets_(3),
next_process_time_(webrtc::Timestamp::MinusInfinity()),
is_started_(false),
is_shutdown_(false),
packet_size_(/*alpha=*/0.95),
include_overhead_(false),
last_send_time_(webrtc::Timestamp::Millis(0)),
last_call_time_(webrtc::Timestamp::Millis(0)),
task_queue_(task_queue) {}
PacedSender::~PacedSender() { is_shutdown_ = true; }
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>>
PacedSender::GeneratePadding(webrtc::DataSize size) {
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> to_send_rtp_packets;
std::vector<std::unique_ptr<RtpPacket>> rtp_packets =
generat_padding_func_(size.bytes(), clock_->CurrentTime().ms());
for (auto &packet : rtp_packets) {
std::unique_ptr<webrtc::RtpPacketToSend> rtp_packet_to_send(
static_cast<webrtc::RtpPacketToSend *>(packet.release()));
rtp_packet_to_send->set_capture_time(clock_->CurrentTime());
rtp_packet_to_send->set_transport_sequence_number((transport_seq_)++);
rtp_packet_to_send->set_packet_type(webrtc::RtpPacketMediaType::kPadding);
to_send_rtp_packets.push_back(std::move(rtp_packet_to_send));
}
return to_send_rtp_packets;
}
void PacedSender::SetSendBurstInterval(webrtc::TimeDelta burst_interval) {
pacing_controller_.SetSendBurstInterval(burst_interval);
}
void PacedSender::SetAllowProbeWithoutMediaPacket(bool allow) {
pacing_controller_.SetAllowProbeWithoutMediaPacket(allow);
}
void PacedSender::EnsureStarted() {
is_started_ = true;
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
}
void PacedSender::CreateProbeClusters(
std::vector<webrtc::ProbeClusterConfig> probe_cluster_configs) {
pacing_controller_.CreateProbeClusters(probe_cluster_configs);
MaybeScheduleProcessPackets();
}
void PacedSender::Pause() { pacing_controller_.Pause(); }
void PacedSender::Resume() {
pacing_controller_.Resume();
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
}
void PacedSender::SetCongested(bool congested) {
pacing_controller_.SetCongested(congested);
MaybeScheduleProcessPackets();
}
void PacedSender::SetPacingRates(webrtc::DataRate pacing_rate,
webrtc::DataRate padding_rate) {
pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
MaybeScheduleProcessPackets();
}
void PacedSender::EnqueuePackets(
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> packets) {
task_queue_->PostTask([this, packets = std::move(packets)]() mutable {
for (auto &packet : packets) {
size_t packet_size = packet->payload_size() + packet->padding_size();
if (include_overhead_) {
packet_size += packet->headers_size();
}
packet_size_.Apply(1, packet_size);
pacing_controller_.EnqueuePacket(std::move(packet));
}
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
});
}
void PacedSender::EnqueuePacket(
std::unique_ptr<webrtc::RtpPacketToSend> packet) {
task_queue_->PostTask([this, packet = std::move(packet)]() mutable {
size_t packet_size = packet->payload_size() + packet->padding_size();
if (include_overhead_) {
packet_size += packet->headers_size();
}
packet_size_.Apply(1, packet_size);
pacing_controller_.EnqueuePacket(std::move(packet));
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
});
}
void PacedSender::RemovePacketsForSsrc(uint32_t ssrc) {
task_queue_->PostTask([this, ssrc] {
pacing_controller_.RemovePacketsForSsrc(ssrc);
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
});
}
void PacedSender::SetAccountForAudioPackets(bool account_for_audio) {
pacing_controller_.SetAccountForAudioPackets(account_for_audio);
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
}
void PacedSender::SetIncludeOverhead() {
include_overhead_ = true;
pacing_controller_.SetIncludeOverhead();
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
}
void PacedSender::SetTransportOverhead(webrtc::DataSize overhead_per_packet) {
pacing_controller_.SetTransportOverhead(overhead_per_packet);
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
}
void PacedSender::SetQueueTimeLimit(webrtc::TimeDelta limit) {
pacing_controller_.SetQueueTimeLimit(limit);
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
}
webrtc::TimeDelta PacedSender::ExpectedQueueTime() const {
return GetStats().expected_queue_time;
}
webrtc::DataSize PacedSender::QueueSizeData() const {
return GetStats().queue_size;
}
std::optional<webrtc::Timestamp> PacedSender::FirstSentPacketTime() const {
return GetStats().first_sent_packet_time;
}
webrtc::TimeDelta PacedSender::OldestPacketWaitTime() const {
webrtc::Timestamp oldest_packet = GetStats().oldest_packet_enqueue_time;
if (oldest_packet.IsInfinite()) {
return webrtc::TimeDelta::Zero();
}
// (webrtc:9716): The clock is not always monotonic.
webrtc::Timestamp current = clock_->CurrentTime();
if (current < oldest_packet) {
return webrtc::TimeDelta::Zero();
}
return current - oldest_packet;
}
void PacedSender::OnStatsUpdated(const Stats &stats) { current_stats_ = stats; }
void PacedSender::MaybeScheduleProcessPackets() {
if (!processing_packets_) {
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
}
}
void PacedSender::MaybeProcessPackets(
webrtc::Timestamp scheduled_process_time) {
if (is_shutdown_ || !is_started_) {
return;
}
// Protects against re-entry from transport feedback calling into the task
// queue pacer.
auto cleanup = std::unique_ptr<void, std::function<void(void *)>>(
nullptr, [this](void *) { processing_packets_ = false; });
webrtc::Timestamp next_send_time = pacing_controller_.NextSendTime();
const webrtc::Timestamp now = clock_->CurrentTime();
webrtc::TimeDelta early_execute_margin =
pacing_controller_.IsProbing()
? webrtc::PacingController::kMaxEarlyProbeProcessing
: webrtc::TimeDelta::Zero();
// Process packets and update stats.
while (next_send_time <= now + early_execute_margin) {
pacing_controller_.ProcessPackets();
next_send_time = pacing_controller_.NextSendTime();
// Probing state could change. Get margin after process packets.
early_execute_margin =
pacing_controller_.IsProbing()
? webrtc::PacingController::kMaxEarlyProbeProcessing
: webrtc::TimeDelta::Zero();
}
UpdateStats();
// Ignore retired scheduled task, otherwise reset `next_process_time_`.
if (scheduled_process_time.IsFinite()) {
if (scheduled_process_time != next_process_time_) {
return;
}
next_process_time_ = webrtc::Timestamp::MinusInfinity();
}
// Do not hold back in probing.
webrtc::TimeDelta hold_back_window = webrtc::TimeDelta::Zero();
if (!pacing_controller_.IsProbing()) {
hold_back_window = max_hold_back_window_;
webrtc::DataRate pacing_rate = pacing_controller_.pacing_rate();
if (max_hold_back_window_in_packets_ != kNoPacketHoldback &&
!pacing_rate.IsZero() &&
packet_size_.filtered() != rtc::ExpFilter::kValueUndefined) {
webrtc::TimeDelta avg_packet_send_time =
webrtc::DataSize::Bytes(packet_size_.filtered()) / pacing_rate;
hold_back_window =
std::min(hold_back_window,
avg_packet_send_time * max_hold_back_window_in_packets_);
}
}
// Calculate next process time.
webrtc::TimeDelta time_to_next_process =
std::max(hold_back_window, next_send_time - now - early_execute_margin);
next_send_time = now + time_to_next_process;
// If no in flight task or in flight task is later than `next_send_time`,
// schedule a new one. Previous in flight task will be retired.
if (next_process_time_.IsMinusInfinity() ||
next_process_time_ > next_send_time) {
// Prefer low precision if allowed and not probing.
task_queue_->PostDelayedTask(
[this, next_send_time]() { MaybeProcessPackets(next_send_time); },
time_to_next_process.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms());
next_process_time_ = next_send_time;
}
}
void PacedSender::UpdateStats() {
Stats new_stats;
new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime();
new_stats.first_sent_packet_time = pacing_controller_.FirstSentPacketTime();
new_stats.oldest_packet_enqueue_time =
pacing_controller_.OldestPacketEnqueueTime();
new_stats.queue_size = pacing_controller_.QueueSizeData();
OnStatsUpdated(new_stats);
}
PacedSender::Stats PacedSender::GetStats() const { return current_stats_; }
/*----------------------------------------------------------------------------*/
int PacedSender::EnqueueRtpPackets(
std::vector<std::unique_ptr<RtpPacket>> &rtp_packets,
int64_t captured_timestamp_us) {
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> to_send_rtp_packets;
for (auto &rtp_packet : rtp_packets) {
std::unique_ptr<webrtc::RtpPacketToSend> rtp_packet_to_send(
static_cast<webrtc::RtpPacketToSend *>(rtp_packet.release()));
rtp_packet_to_send->set_capture_time(clock_->CurrentTime());
rtp_packet_to_send->set_transport_sequence_number(transport_seq_++);
switch (rtp_packet_to_send->PayloadType()) {
case rtp::PAYLOAD_TYPE::H264:
rtp_packet_to_send->set_packet_type(webrtc::RtpPacketMediaType::kVideo);
break;
case rtp::PAYLOAD_TYPE::AV1:
rtp_packet_to_send->set_packet_type(webrtc::RtpPacketMediaType::kVideo);
break;
case rtp::PAYLOAD_TYPE::H264_FEC_SOURCE:
rtp_packet_to_send->set_packet_type(
webrtc::RtpPacketMediaType::kForwardErrorCorrection);
break;
case rtp::PAYLOAD_TYPE::H264_FEC_REPAIR:
rtp_packet_to_send->set_packet_type(
webrtc::RtpPacketMediaType::kForwardErrorCorrection);
break;
case rtp::PAYLOAD_TYPE::OPUS:
rtp_packet_to_send->set_packet_type(webrtc::RtpPacketMediaType::kAudio);
break;
default:
rtp_packet_to_send->set_packet_type(webrtc::RtpPacketMediaType::kVideo);
break;
}
// webrtc::PacedPacketInfo cluster_info;
// SendPacket(std::move(rtp_packet_to_send), cluster_info);
to_send_rtp_packets.push_back(std::move(rtp_packet_to_send));
}
EnqueuePackets(std::move(to_send_rtp_packets));
return 0;
}
int PacedSender::EnqueueRtpPackets(
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> &rtp_packets) {
EnqueuePackets(std::move(rtp_packets));
return 0;
}
int PacedSender::EnqueueRtpPacket(
std::unique_ptr<webrtc::RtpPacketToSend> rtp_packet) {
EnqueuePacket(std::move(rtp_packet));
return 0;
}

View File

@@ -0,0 +1,231 @@
/*
* @Author: DI JUNKUN
* @Date: 2025-03-12
* Copyright (c) 2025 by DI JUNKUN, All Rights Reserved.
*/
#ifndef _PACED_SENDER__H_
#define _PACED_SENDER__H_
#include <memory>
#include "api/array_view.h"
#include "api/transport/network_types.h"
#include "api/units/data_rate.h"
#include "api/units/data_size.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "ice_agent.h"
#include "log.h"
#include "pacing_controller.h"
#include "rtc_base/numerics/exp_filter.h"
#include "rtp_packet_pacer.h"
#include "rtp_packet_to_send.h"
#include "task_queue.h"
class PacedSender : public webrtc::RtpPacketPacer,
public webrtc::PacingController::PacketSender {
public:
static const int kNoPacketHoldback;
PacedSender(std::shared_ptr<IceAgent> ice_agent,
std::shared_ptr<webrtc::Clock> clock,
std::shared_ptr<TaskQueue> task_queue);
~PacedSender();
public:
int Send() { return 0; }
int EnqueueRtpPackets(std::vector<std::unique_ptr<RtpPacket>>& rtp_packets,
int64_t captured_timestamp_us);
int EnqueueRtpPackets(
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>>& rtp_packets);
int EnqueueRtpPacket(std::unique_ptr<webrtc::RtpPacketToSend> rtp_packet);
public:
void SetOnSentPacketFunc(
std::function<void(std::unique_ptr<webrtc::RtpPacketToSend>)>
on_sent_packet_func) {
on_sent_packet_func_ = on_sent_packet_func;
}
void SetGeneratePaddingFunc(
std::function<std::vector<std::unique_ptr<RtpPacket>>(uint32_t, int64_t)>
generat_padding_func) {
generat_padding_func_ = generat_padding_func;
}
public:
void SendPacket(std::unique_ptr<webrtc::RtpPacketToSend> packet,
const webrtc::PacedPacketInfo& cluster_info) override {
if (on_sent_packet_func_) {
if (ssrc_seq_.find(packet->Ssrc()) == ssrc_seq_.end()) {
ssrc_seq_[packet->Ssrc()] = 1;
}
packet->UpdateSequenceNumber(ssrc_seq_[packet->Ssrc()]++);
on_sent_packet_func_(std::move(packet));
}
}
// Should be called after each call to SendPacket().
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> FetchFec() override {
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> fec_packets;
return fec_packets;
}
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> GeneratePadding(
webrtc::DataSize size) override;
// TODO(bugs.webrtc.org/1439830): Make pure once subclasses adapt.
void OnBatchComplete() override {}
// TODO(bugs.webrtc.org/11340): Make pure once downstream projects
// have been updated.
void OnAbortedRetransmissions(
uint32_t /* ssrc */,
rtc::ArrayView<const uint16_t> /* sequence_numbers */) {}
std::optional<uint32_t> GetRtxSsrcForMedia(
uint32_t /* ssrc */) const override {
return std::nullopt;
}
public:
void SetSendBurstInterval(webrtc::TimeDelta burst_interval);
// A probe may be sent without first waing for a media packet.
void SetAllowProbeWithoutMediaPacket(bool allow);
// Ensure that necessary delayed tasks are scheduled.
void EnsureStarted();
// Methods implementing RtpPacketSender.
// Adds the packet to the queue and calls
// PacingController::PacedSender::SendPacket() when it's time to send.
void EnqueuePackets(
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> packets);
void EnqueuePacket(std::unique_ptr<webrtc::RtpPacketToSend> packet);
// Remove any pending packets matching this SSRC from the packet queue.
void RemovePacketsForSsrc(uint32_t ssrc);
void CreateProbeClusters(
std::vector<webrtc::ProbeClusterConfig> probe_cluster_configs) override;
// Temporarily pause all sending.
void Pause() override;
// Resume sending packets.
void Resume() override;
void SetCongested(bool congested) override;
// Sets the pacing rates. Must be called once before packets can be sent.
void SetPacingRates(webrtc::DataRate pacing_rate,
webrtc::DataRate padding_rate) override;
// Currently audio traffic is not accounted for by pacer and passed through.
// With the introduction of audio BWE, audio traffic will be accounted for
// in the pacer budget calculation. The audio traffic will still be injected
// at high priority.
void SetAccountForAudioPackets(bool account_for_audio) override;
void SetIncludeOverhead() override;
void SetTransportOverhead(webrtc::DataSize overhead_per_packet) override;
// Time since the oldest packet currently in the queue was added.
webrtc::TimeDelta OldestPacketWaitTime() const override;
// Sum of payload + padding bytes of all packets currently in the pacer queue.
webrtc::DataSize QueueSizeData() const override;
// Returns the time when the first packet was sent.
std::optional<webrtc::Timestamp> FirstSentPacketTime() const override;
// Returns the expected number of milliseconds it will take to send the
// current packets in the queue, given the current size and bitrate, ignoring
// priority.
webrtc::TimeDelta ExpectedQueueTime() const override;
// Set the average upper bound on pacer queuing delay. The pacer may send at
// a higher rate than what was configured via SetPacingRates() in order to
// keep ExpectedQueueTimeMs() below `limit_ms` on average.
void SetQueueTimeLimit(webrtc::TimeDelta limit) override;
protected:
// Exposed as protected for test.
struct Stats {
Stats()
: oldest_packet_enqueue_time(webrtc::Timestamp::MinusInfinity()),
queue_size(webrtc::DataSize::Zero()),
expected_queue_time(webrtc::TimeDelta::Zero()) {}
webrtc::Timestamp oldest_packet_enqueue_time;
webrtc::DataSize queue_size;
webrtc::TimeDelta expected_queue_time;
std::optional<webrtc::Timestamp> first_sent_packet_time;
};
void OnStatsUpdated(const Stats& stats);
private:
// Call in response to state updates that could warrant sending out packets.
// Protected against re-entry from packet sent receipts.
void MaybeScheduleProcessPackets();
// Check if it is time to send packets, or schedule a delayed task if not.
// Use Timestamp::MinusInfinity() to indicate that this call has _not_
// been scheduled by the pacing controller. If this is the case, check if we
// can execute immediately otherwise schedule a delay task that calls this
// method again with desired (finite) scheduled process time.
void MaybeProcessPackets(webrtc::Timestamp scheduled_process_time);
void UpdateStats();
Stats GetStats() const;
private:
std::shared_ptr<IceAgent> ice_agent_ = nullptr;
webrtc::PacingController pacing_controller_;
std::function<void(std::unique_ptr<webrtc::RtpPacketToSend>)>
on_sent_packet_func_ = nullptr;
std::function<std::vector<std::unique_ptr<RtpPacket>>(uint32_t, int64_t)>
generat_padding_func_ = nullptr;
private:
std::shared_ptr<webrtc::Clock> clock_ = nullptr;
private:
const webrtc::TimeDelta max_hold_back_window_;
const int max_hold_back_window_in_packets_;
// We want only one (valid) delayed process task in flight at a time.
// If the value of `next_process_time_` is finite, it is an id for a
// delayed task that will call MaybeProcessPackets() with that time
// as parameter.
// Timestamp::MinusInfinity() indicates no valid pending task.
webrtc::Timestamp next_process_time_;
// Indicates if this task queue is started. If not, don't allow
// posting delayed tasks yet.
bool is_started_;
// Indicates if this task queue is shutting down. If so, don't allow
// posting any more delayed tasks as that can cause the task queue to
// never drain.
bool is_shutdown_;
// Filtered size of enqueued packets, in bytes.
rtc::ExpFilter packet_size_;
bool include_overhead_;
Stats current_stats_;
// Protects against ProcessPackets reentry from packet sent receipts.
bool processing_packets_ = false;
std::shared_ptr<TaskQueue> task_queue_;
int64_t transport_seq_ = 0;
std::map<int32_t, int16_t> ssrc_seq_;
webrtc::Timestamp last_send_time_;
webrtc::Timestamp last_call_time_;
};
#endif