[feat] add task queue module

This commit is contained in:
dijunkun
2025-03-14 18:29:07 +08:00
parent d2b45b91e7
commit bd65d87137
8 changed files with 270 additions and 63 deletions

View File

@@ -36,6 +36,12 @@ class VideoChannelSend {
std::vector<std::unique_ptr<RtpPacket>> GeneratePadding( std::vector<std::unique_ptr<RtpPacket>> GeneratePadding(
uint32_t payload_size, int64_t capture_timestamp_ms); uint32_t payload_size, int64_t capture_timestamp_ms);
int64_t GetTransportSeqAndIncrement() {
int64_t transport_seq = rtp_video_sender_->GetTransportSequenceNumber();
rtp_video_sender_->IncrementTransportSequenceNumber();
return transport_seq;
}
public: public:
void Initialize(rtp::PAYLOAD_TYPE payload_type); void Initialize(rtp::PAYLOAD_TYPE payload_type);
void Destroy(); void Destroy();

View File

@@ -35,6 +35,10 @@ class RtpVideoSender : public ThreadBase {
uint32_t GetSsrc() { return ssrc_; } uint32_t GetSsrc() { return ssrc_; }
void OnReceiverReport(const ReceiverReport &receiver_report); void OnReceiverReport(const ReceiverReport &receiver_report);
int64_t GetTransportSequenceNumber() { return transport_seq_; }
void IncrementTransportSequenceNumber() { transport_seq_++; }
private: private:
int SendRtpPacket( int SendRtpPacket(
std::unique_ptr<webrtc::RtpPacketToSend> rtp_packet_to_send); std::unique_ptr<webrtc::RtpPacketToSend> rtp_packet_to_send);

View File

@@ -0,0 +1,69 @@
/*
* @Author: DI JUNKUN
* @Date: 2025-03-14
* Copyright (c) 2025 by DI JUNKUN, All Rights Reserved.
*/
#ifndef _ANY_INVOCABLE_H_
#define _ANY_INVOCABLE_H_
#include <functional>
#include <iostream>
#include <memory>
// 简化版的 AnyInvocable
template <typename Signature>
class AnyInvocable;
template <typename R, typename... Args>
class AnyInvocable<R(Args...)> {
public:
// 默认构造函数
AnyInvocable() = default;
// 构造函数:接受一个可以调用的对象
template <typename Callable>
AnyInvocable(Callable&& callable)
: callable_(std::make_unique<CallableWrapper<Callable>>(
std::forward<Callable>(callable))) {}
// 调用运算符
R operator()(Args... args) {
return callable_->Invoke(std::forward<Args>(args)...);
}
// 移动构造函数
AnyInvocable(AnyInvocable&&) = default;
// 移动赋值运算符
AnyInvocable& operator=(AnyInvocable&&) = default;
private:
// 抽象基类,允许不同类型的可调用对象
struct CallableBase {
virtual ~CallableBase() = default;
virtual R Invoke(Args&&... args) = 0;
};
// 模板派生类:实际存储 callable 对象
template <typename Callable>
struct CallableWrapper : public CallableBase {
CallableWrapper(Callable&& callable)
: callable_(std::forward<Callable>(callable)) {}
R Invoke(Args&&... args) override {
return callable_(std::forward<Args>(args)...);
}
Callable callable_;
};
std::unique_ptr<CallableBase> callable_;
};
// 简单的包装函数
template <typename R, typename... Args>
AnyInvocable<R(Args...)> MakeMoveOnlyFunction(std::function<R(Args...)>&& f) {
return AnyInvocable<R(Args...)>(std::move(f));
}
#endif

107
src/common/task_queue.h Normal file
View File

@@ -0,0 +1,107 @@
/*
* @Author: DI JUNKUN
* @Date: 2025-03-14
* Copyright (c) 2025 by DI JUNKUN, All Rights Reserved.
*/
#ifndef _TASK_QUEUE_H_
#define _TASK_QUEUE_H_
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
#include "any_invocable.h"
class TaskQueue {
public:
TaskQueue(size_t numThreads = 1)
: stop_(false), workers_(), taskQueue_(), mutex_(), cond_var_() {
for (size_t i = 0; i < numThreads; i++) {
workers_.emplace_back([this]() { this->WorkerThread(); });
}
}
~TaskQueue() {
{
std::unique_lock<std::mutex> lock(mutex_);
stop_ = true;
}
cond_var_.notify_all();
for (std::thread &worker : workers_) {
if (worker.joinable()) {
worker.join();
}
}
}
// 立即执行任务
void PostTask(AnyInvocable<void()> task) {
PostDelayedTask(std::move(task), 0);
}
// 延迟执行任务
void PostDelayedTask(AnyInvocable<void()> task, int delay_ms) {
auto execute_time =
std::chrono::steady_clock::now() + std::chrono::milliseconds(delay_ms);
{
std::unique_lock<std::mutex> lock(mutex_);
taskQueue_.emplace(execute_time,
std::move(task)); // 确保参数匹配 TaskItem 构造
}
cond_var_.notify_one();
}
private:
struct TaskItem {
std::chrono::steady_clock::time_point execute_time;
AnyInvocable<void()> task = nullptr;
TaskItem(std::chrono::steady_clock::time_point time,
AnyInvocable<void()> func)
: execute_time(time), task(std::move(func)) {}
bool operator>(const TaskItem &other) const {
return execute_time > other.execute_time;
}
};
void WorkerThread() {
while (true) {
AnyInvocable<void()> task;
{
std::unique_lock<std::mutex> lock(mutex_);
cond_var_.wait(lock, [this]() { return !taskQueue_.empty() || stop_; });
if (stop_ && taskQueue_.empty()) return;
auto now = std::chrono::steady_clock::now();
if (taskQueue_.top().execute_time > now) {
cond_var_.wait_until(lock, taskQueue_.top().execute_time,
[this]() { return stop_; });
}
if (stop_ && taskQueue_.empty()) return;
task = std::move(
const_cast<AnyInvocable<void()> &>(taskQueue_.top().task));
taskQueue_.pop();
}
task(); // 执行任务
}
}
std::vector<std::thread> workers_;
std::priority_queue<TaskItem, std::vector<TaskItem>, std::greater<>>
taskQueue_;
mutable std::mutex mutex_;
std::condition_variable cond_var_;
bool stop_;
};
#endif

View File

@@ -412,6 +412,7 @@ void PacingController::ProcessPackets() {
PacedPacketInfo pacing_info; PacedPacketInfo pacing_info;
DataSize recommended_probe_size = DataSize::Zero(); DataSize recommended_probe_size = DataSize::Zero();
bool is_probing = prober_.is_probing(); bool is_probing = prober_.is_probing();
LOG_WARN("is probing");
if (is_probing) { if (is_probing) {
// Probe timing is sensitive, and handled explicitly by BitrateProber, so // Probe timing is sensitive, and handled explicitly by BitrateProber, so
// use actual send time rather than target. // use actual send time rather than target.
@@ -433,7 +434,8 @@ void PacingController::ProcessPackets() {
// exhausted. // exhausted.
std::unique_ptr<RtpPacketToSend> rtp_packet = std::unique_ptr<RtpPacketToSend> rtp_packet =
GetPendingPacket(pacing_info, target_send_time, now); GetPendingPacket(pacing_info, target_send_time, now);
if (rtp_packet == nullptr) { if (!rtp_packet) {
LOG_WARN("rtp_packet == nullptr");
// No packet available to send, check if we should send padding. // No packet available to send, check if we should send padding.
if (now - target_send_time > kMaxPaddingReplayDuration) { if (now - target_send_time > kMaxPaddingReplayDuration) {
// The target send time is more than `kMaxPaddingReplayDuration` behind // The target send time is more than `kMaxPaddingReplayDuration` behind
@@ -472,6 +474,7 @@ void PacingController::ProcessPackets() {
transport_overhead_per_packet_; transport_overhead_per_packet_;
} }
LOG_ERROR("Send packet_size {}", rtp_packet->Size());
packet_sender_->SendPacket(std::move(rtp_packet), pacing_info); packet_sender_->SendPacket(std::move(rtp_packet), pacing_info);
for (auto& packet : packet_sender_->FetchFec()) { for (auto& packet : packet_sender_->FetchFec()) {
EnqueuePacket(std::move(packet)); EnqueuePacket(std::move(packet));

View File

@@ -246,55 +246,63 @@ std::vector<std::unique_ptr<RtpPacket>> RtpPacketizerH264::BuildPadding(
bool use_rtp_packet_to_send) { bool use_rtp_packet_to_send) {
std::vector<std::unique_ptr<RtpPacket>> rtp_packets; std::vector<std::unique_ptr<RtpPacket>> rtp_packets;
version_ = kRtpVersion; uint32_t remaining_size = payload_size;
has_padding_ = true; while (remaining_size > 0) {
has_extension_ = true; uint32_t current_payload_size =
csrc_count_ = 0; std::min<uint32_t>(remaining_size, MAX_NALU_LEN);
marker_ = 0;
uint8_t payload_type = rtp::PAYLOAD_TYPE(payload_type_ - 1);
sequence_number_++;
timestamp_ = kMsToRtpTimestamp * static_cast<uint32_t>(capture_timestamp_ms);
rtp_packet_frame_.clear(); version_ = kRtpVersion;
rtp_packet_frame_.push_back((version_ << 6) | (has_padding_ << 5) | has_padding_ = true;
(has_extension_ << 4) | csrc_count_); has_extension_ = true;
rtp_packet_frame_.push_back((marker_ << 7) | payload_type); csrc_count_ = 0;
rtp_packet_frame_.push_back((sequence_number_ >> 8) & 0xFF); marker_ = 0;
rtp_packet_frame_.push_back(sequence_number_ & 0xFF); uint8_t payload_type = rtp::PAYLOAD_TYPE(payload_type_ - 1);
rtp_packet_frame_.push_back((timestamp_ >> 24) & 0xFF); sequence_number_++;
rtp_packet_frame_.push_back((timestamp_ >> 16) & 0xFF); timestamp_ =
rtp_packet_frame_.push_back((timestamp_ >> 8) & 0xFF); kMsToRtpTimestamp * static_cast<uint32_t>(capture_timestamp_ms);
rtp_packet_frame_.push_back(timestamp_ & 0xFF);
rtp_packet_frame_.push_back((ssrc_ >> 24) & 0xFF);
rtp_packet_frame_.push_back((ssrc_ >> 16) & 0xFF);
rtp_packet_frame_.push_back((ssrc_ >> 8) & 0xFF);
rtp_packet_frame_.push_back(ssrc_ & 0xFF);
for (uint32_t index = 0; index < csrc_count_ && !csrcs_.empty(); index++) { rtp_packet_frame_.clear();
rtp_packet_frame_.push_back((csrcs_[index] >> 24) & 0xFF); rtp_packet_frame_.push_back((version_ << 6) | (has_padding_ << 5) |
rtp_packet_frame_.push_back((csrcs_[index] >> 16) & 0xFF); (has_extension_ << 4) | csrc_count_);
rtp_packet_frame_.push_back((csrcs_[index] >> 8) & 0xFF); rtp_packet_frame_.push_back((marker_ << 7) | payload_type);
rtp_packet_frame_.push_back(csrcs_[index] & 0xFF); rtp_packet_frame_.push_back((sequence_number_ >> 8) & 0xFF);
} rtp_packet_frame_.push_back(sequence_number_ & 0xFF);
rtp_packet_frame_.push_back((timestamp_ >> 24) & 0xFF);
rtp_packet_frame_.push_back((timestamp_ >> 16) & 0xFF);
rtp_packet_frame_.push_back((timestamp_ >> 8) & 0xFF);
rtp_packet_frame_.push_back(timestamp_ & 0xFF);
rtp_packet_frame_.push_back((ssrc_ >> 24) & 0xFF);
rtp_packet_frame_.push_back((ssrc_ >> 16) & 0xFF);
rtp_packet_frame_.push_back((ssrc_ >> 8) & 0xFF);
rtp_packet_frame_.push_back(ssrc_ & 0xFF);
if (has_extension_) { for (uint32_t index = 0; index < csrc_count_ && !csrcs_.empty(); index++) {
AddAbsSendTimeExtension(rtp_packet_frame_); rtp_packet_frame_.push_back((csrcs_[index] >> 24) & 0xFF);
} rtp_packet_frame_.push_back((csrcs_[index] >> 16) & 0xFF);
rtp_packet_frame_.push_back((csrcs_[index] >> 8) & 0xFF);
rtp_packet_frame_.push_back(csrcs_[index] & 0xFF);
}
// Add padding bytes if (has_extension_) {
uint32_t padding_size = payload_size; AddAbsSendTimeExtension(rtp_packet_frame_);
rtp_packet_frame_.insert(rtp_packet_frame_.end(), padding_size - 1, 0); }
rtp_packet_frame_.push_back(padding_size);
if (use_rtp_packet_to_send) { // Add padding bytes
std::unique_ptr<webrtc::RtpPacketToSend> rtp_packet = uint32_t padding_size = current_payload_size;
std::make_unique<webrtc::RtpPacketToSend>(); rtp_packet_frame_.insert(rtp_packet_frame_.end(), padding_size - 1, 0);
rtp_packet->Build(rtp_packet_frame_.data(), rtp_packet_frame_.size());
rtp_packets.emplace_back(std::move(rtp_packet)); if (use_rtp_packet_to_send) {
} else { std::unique_ptr<webrtc::RtpPacketToSend> rtp_packet =
std::unique_ptr<RtpPacket> rtp_packet = std::make_unique<RtpPacket>(); std::make_unique<webrtc::RtpPacketToSend>();
rtp_packet->Build(rtp_packet_frame_.data(), rtp_packet_frame_.size()); rtp_packet->Build(rtp_packet_frame_.data(), rtp_packet_frame_.size());
rtp_packets.emplace_back(std::move(rtp_packet)); rtp_packets.emplace_back(std::move(rtp_packet));
} else {
std::unique_ptr<RtpPacket> rtp_packet = std::make_unique<RtpPacket>();
rtp_packet->Build(rtp_packet_frame_.data(), rtp_packet_frame_.size());
rtp_packets.emplace_back(std::move(rtp_packet));
}
remaining_size -= current_payload_size;
} }
return rtp_packets; return rtp_packets;

View File

@@ -25,11 +25,16 @@ PacketSender::GeneratePadding(webrtc::DataSize size) {
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> to_send_rtp_packets; std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> to_send_rtp_packets;
std::vector<std::unique_ptr<RtpPacket>> rtp_packets = std::vector<std::unique_ptr<RtpPacket>> rtp_packets =
generat_padding_func_(size.bytes(), clock_->CurrentTime().ms()); generat_padding_func_(size.bytes(), clock_->CurrentTime().ms());
// for (auto &packet : rtp_packets) { for (auto &packet : rtp_packets) {
// std::unique_ptr<webrtc::RtpPacketToSend> rtp_packet_to_send( std::unique_ptr<webrtc::RtpPacketToSend> rtp_packet_to_send(
// static_cast<webrtc::RtpPacketToSend *>(packet.release())); static_cast<webrtc::RtpPacketToSend *>(packet.release()));
// to_send_rtp_packets.push_back(std::move(rtp_packet_to_send));
// } rtp_packet_to_send->set_capture_time(clock_->CurrentTime());
rtp_packet_to_send->set_transport_sequence_number((transport_seq_)++);
rtp_packet_to_send->set_packet_type(webrtc::RtpPacketMediaType::kPadding);
to_send_rtp_packets.push_back(std::move(rtp_packet_to_send));
}
return to_send_rtp_packets; return to_send_rtp_packets;
} }
@@ -67,23 +72,24 @@ void PacketSender::SetPacingRates(webrtc::DataRate pacing_rate,
void PacketSender::EnqueuePackets( void PacketSender::EnqueuePackets(
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> packets) { std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> packets) {
// task_queue_->PostTask() task_queue_.PostTask([this, packets = std::move(packets)]() mutable {
for (auto &packet : packets) { for (auto &packet : packets) {
size_t packet_size = packet->payload_size() + packet->padding_size(); size_t packet_size = packet->payload_size() + packet->padding_size();
if (include_overhead_) { if (include_overhead_) {
packet_size += packet->headers_size(); packet_size += packet->headers_size();
}
packet_size_.Apply(1, packet_size);
pacing_controller_.EnqueuePacket(std::move(packet));
} }
packet_size_.Apply(1, packet_size); MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
pacing_controller_.EnqueuePacket(std::move(packet)); });
}
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
} }
void PacketSender::RemovePacketsForSsrc(uint32_t ssrc) { void PacketSender::RemovePacketsForSsrc(uint32_t ssrc) {
// task_queue_->PostTask(SafeTask(safety_.flag(), [this, ssrc] { task_queue_.PostTask([this, ssrc] {
pacing_controller_.RemovePacketsForSsrc(ssrc); pacing_controller_.RemovePacketsForSsrc(ssrc);
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
// })); });
} }
void PacketSender::SetAccountForAudioPackets(bool account_for_audio) { void PacketSender::SetAccountForAudioPackets(bool account_for_audio) {

View File

@@ -21,6 +21,7 @@
#include "rtc_base/numerics/exp_filter.h" #include "rtc_base/numerics/exp_filter.h"
#include "rtp_packet_pacer.h" #include "rtp_packet_pacer.h"
#include "rtp_packet_to_send.h" #include "rtp_packet_to_send.h"
#include "task_queue.h"
class PacketSender : public webrtc::RtpPacketPacer, class PacketSender : public webrtc::RtpPacketPacer,
public webrtc::PacingController::PacketSender { public webrtc::PacingController::PacketSender {
@@ -197,6 +198,9 @@ class PacketSender : public webrtc::RtpPacketPacer,
Stats current_stats_; Stats current_stats_;
// Protects against ProcessPackets reentry from packet sent receipts. // Protects against ProcessPackets reentry from packet sent receipts.
bool processing_packets_ = false; bool processing_packets_ = false;
TaskQueue task_queue_;
int64_t transport_seq_ = 0;
}; };
#endif #endif