diff --git a/src/channel/meida_channel/video_channel_send.h b/src/channel/meida_channel/video_channel_send.h index f3334d5..c0c15b4 100644 --- a/src/channel/meida_channel/video_channel_send.h +++ b/src/channel/meida_channel/video_channel_send.h @@ -36,6 +36,12 @@ class VideoChannelSend { std::vector> GeneratePadding( uint32_t payload_size, int64_t capture_timestamp_ms); + int64_t GetTransportSeqAndIncrement() { + int64_t transport_seq = rtp_video_sender_->GetTransportSequenceNumber(); + rtp_video_sender_->IncrementTransportSequenceNumber(); + return transport_seq; + } + public: void Initialize(rtp::PAYLOAD_TYPE payload_type); void Destroy(); diff --git a/src/channel/rtp_channel/rtp_video_sender.h b/src/channel/rtp_channel/rtp_video_sender.h index 12a50ab..32bed4f 100644 --- a/src/channel/rtp_channel/rtp_video_sender.h +++ b/src/channel/rtp_channel/rtp_video_sender.h @@ -35,6 +35,10 @@ class RtpVideoSender : public ThreadBase { uint32_t GetSsrc() { return ssrc_; } void OnReceiverReport(const ReceiverReport &receiver_report); + int64_t GetTransportSequenceNumber() { return transport_seq_; } + + void IncrementTransportSequenceNumber() { transport_seq_++; } + private: int SendRtpPacket( std::unique_ptr rtp_packet_to_send); diff --git a/src/common/any_invocable.h b/src/common/any_invocable.h new file mode 100644 index 0000000..b393ba6 --- /dev/null +++ b/src/common/any_invocable.h @@ -0,0 +1,69 @@ +/* + * @Author: DI JUNKUN + * @Date: 2025-03-14 + * Copyright (c) 2025 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _ANY_INVOCABLE_H_ +#define _ANY_INVOCABLE_H_ + +#include +#include +#include + +// 简化版的 AnyInvocable +template +class AnyInvocable; + +template +class AnyInvocable { + public: + // 默认构造函数 + AnyInvocable() = default; + + // 构造函数:接受一个可以调用的对象 + template + AnyInvocable(Callable&& callable) + : callable_(std::make_unique>( + std::forward(callable))) {} + + // 调用运算符 + R operator()(Args... args) { + return callable_->Invoke(std::forward(args)...); + } + + // 移动构造函数 + AnyInvocable(AnyInvocable&&) = default; + // 移动赋值运算符 + AnyInvocable& operator=(AnyInvocable&&) = default; + + private: + // 抽象基类,允许不同类型的可调用对象 + struct CallableBase { + virtual ~CallableBase() = default; + virtual R Invoke(Args&&... args) = 0; + }; + + // 模板派生类:实际存储 callable 对象 + template + struct CallableWrapper : public CallableBase { + CallableWrapper(Callable&& callable) + : callable_(std::forward(callable)) {} + + R Invoke(Args&&... args) override { + return callable_(std::forward(args)...); + } + + Callable callable_; + }; + + std::unique_ptr callable_; +}; + +// 简单的包装函数 +template +AnyInvocable MakeMoveOnlyFunction(std::function&& f) { + return AnyInvocable(std::move(f)); +} + +#endif \ No newline at end of file diff --git a/src/common/task_queue.h b/src/common/task_queue.h new file mode 100644 index 0000000..1736650 --- /dev/null +++ b/src/common/task_queue.h @@ -0,0 +1,107 @@ +/* + * @Author: DI JUNKUN + * @Date: 2025-03-14 + * Copyright (c) 2025 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _TASK_QUEUE_H_ +#define _TASK_QUEUE_H_ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "any_invocable.h" + +class TaskQueue { + public: + TaskQueue(size_t numThreads = 1) + : stop_(false), workers_(), taskQueue_(), mutex_(), cond_var_() { + for (size_t i = 0; i < numThreads; i++) { + workers_.emplace_back([this]() { this->WorkerThread(); }); + } + } + + ~TaskQueue() { + { + std::unique_lock lock(mutex_); + stop_ = true; + } + cond_var_.notify_all(); + for (std::thread &worker : workers_) { + if (worker.joinable()) { + worker.join(); + } + } + } + + // 立即执行任务 + void PostTask(AnyInvocable task) { + PostDelayedTask(std::move(task), 0); + } + + // 延迟执行任务 + void PostDelayedTask(AnyInvocable task, int delay_ms) { + auto execute_time = + std::chrono::steady_clock::now() + std::chrono::milliseconds(delay_ms); + { + std::unique_lock lock(mutex_); + taskQueue_.emplace(execute_time, + std::move(task)); // 确保参数匹配 TaskItem 构造 + } + cond_var_.notify_one(); + } + + private: + struct TaskItem { + std::chrono::steady_clock::time_point execute_time; + AnyInvocable task = nullptr; + + TaskItem(std::chrono::steady_clock::time_point time, + AnyInvocable func) + : execute_time(time), task(std::move(func)) {} + + bool operator>(const TaskItem &other) const { + return execute_time > other.execute_time; + } + }; + + void WorkerThread() { + while (true) { + AnyInvocable task; + { + std::unique_lock lock(mutex_); + cond_var_.wait(lock, [this]() { return !taskQueue_.empty() || stop_; }); + + if (stop_ && taskQueue_.empty()) return; + + auto now = std::chrono::steady_clock::now(); + if (taskQueue_.top().execute_time > now) { + cond_var_.wait_until(lock, taskQueue_.top().execute_time, + [this]() { return stop_; }); + } + + if (stop_ && taskQueue_.empty()) return; + + task = std::move( + const_cast &>(taskQueue_.top().task)); + taskQueue_.pop(); + } + task(); // 执行任务 + } + } + + std::vector workers_; + std::priority_queue, std::greater<>> + taskQueue_; + mutable std::mutex mutex_; + std::condition_variable cond_var_; + bool stop_; +}; + +#endif diff --git a/src/qos/pacing_controller.cc b/src/qos/pacing_controller.cc index 5a105c9..594ed49 100644 --- a/src/qos/pacing_controller.cc +++ b/src/qos/pacing_controller.cc @@ -412,6 +412,7 @@ void PacingController::ProcessPackets() { PacedPacketInfo pacing_info; DataSize recommended_probe_size = DataSize::Zero(); bool is_probing = prober_.is_probing(); + LOG_WARN("is probing"); if (is_probing) { // Probe timing is sensitive, and handled explicitly by BitrateProber, so // use actual send time rather than target. @@ -433,7 +434,8 @@ void PacingController::ProcessPackets() { // exhausted. std::unique_ptr rtp_packet = GetPendingPacket(pacing_info, target_send_time, now); - if (rtp_packet == nullptr) { + if (!rtp_packet) { + LOG_WARN("rtp_packet == nullptr"); // No packet available to send, check if we should send padding. if (now - target_send_time > kMaxPaddingReplayDuration) { // The target send time is more than `kMaxPaddingReplayDuration` behind @@ -472,6 +474,7 @@ void PacingController::ProcessPackets() { transport_overhead_per_packet_; } + LOG_ERROR("Send packet_size {}", rtp_packet->Size()); packet_sender_->SendPacket(std::move(rtp_packet), pacing_info); for (auto& packet : packet_sender_->FetchFec()) { EnqueuePacket(std::move(packet)); diff --git a/src/rtp/rtp_packetizer/rtp_packetizer_h264.cpp b/src/rtp/rtp_packetizer/rtp_packetizer_h264.cpp index fe1cd30..eb47462 100644 --- a/src/rtp/rtp_packetizer/rtp_packetizer_h264.cpp +++ b/src/rtp/rtp_packetizer/rtp_packetizer_h264.cpp @@ -246,55 +246,63 @@ std::vector> RtpPacketizerH264::BuildPadding( bool use_rtp_packet_to_send) { std::vector> rtp_packets; - version_ = kRtpVersion; - has_padding_ = true; - has_extension_ = true; - csrc_count_ = 0; - marker_ = 0; - uint8_t payload_type = rtp::PAYLOAD_TYPE(payload_type_ - 1); - sequence_number_++; - timestamp_ = kMsToRtpTimestamp * static_cast(capture_timestamp_ms); + uint32_t remaining_size = payload_size; + while (remaining_size > 0) { + uint32_t current_payload_size = + std::min(remaining_size, MAX_NALU_LEN); - rtp_packet_frame_.clear(); - rtp_packet_frame_.push_back((version_ << 6) | (has_padding_ << 5) | - (has_extension_ << 4) | csrc_count_); - rtp_packet_frame_.push_back((marker_ << 7) | payload_type); - rtp_packet_frame_.push_back((sequence_number_ >> 8) & 0xFF); - rtp_packet_frame_.push_back(sequence_number_ & 0xFF); - rtp_packet_frame_.push_back((timestamp_ >> 24) & 0xFF); - rtp_packet_frame_.push_back((timestamp_ >> 16) & 0xFF); - rtp_packet_frame_.push_back((timestamp_ >> 8) & 0xFF); - rtp_packet_frame_.push_back(timestamp_ & 0xFF); - rtp_packet_frame_.push_back((ssrc_ >> 24) & 0xFF); - rtp_packet_frame_.push_back((ssrc_ >> 16) & 0xFF); - rtp_packet_frame_.push_back((ssrc_ >> 8) & 0xFF); - rtp_packet_frame_.push_back(ssrc_ & 0xFF); + version_ = kRtpVersion; + has_padding_ = true; + has_extension_ = true; + csrc_count_ = 0; + marker_ = 0; + uint8_t payload_type = rtp::PAYLOAD_TYPE(payload_type_ - 1); + sequence_number_++; + timestamp_ = + kMsToRtpTimestamp * static_cast(capture_timestamp_ms); - for (uint32_t index = 0; index < csrc_count_ && !csrcs_.empty(); index++) { - rtp_packet_frame_.push_back((csrcs_[index] >> 24) & 0xFF); - rtp_packet_frame_.push_back((csrcs_[index] >> 16) & 0xFF); - rtp_packet_frame_.push_back((csrcs_[index] >> 8) & 0xFF); - rtp_packet_frame_.push_back(csrcs_[index] & 0xFF); - } + rtp_packet_frame_.clear(); + rtp_packet_frame_.push_back((version_ << 6) | (has_padding_ << 5) | + (has_extension_ << 4) | csrc_count_); + rtp_packet_frame_.push_back((marker_ << 7) | payload_type); + rtp_packet_frame_.push_back((sequence_number_ >> 8) & 0xFF); + rtp_packet_frame_.push_back(sequence_number_ & 0xFF); + rtp_packet_frame_.push_back((timestamp_ >> 24) & 0xFF); + rtp_packet_frame_.push_back((timestamp_ >> 16) & 0xFF); + rtp_packet_frame_.push_back((timestamp_ >> 8) & 0xFF); + rtp_packet_frame_.push_back(timestamp_ & 0xFF); + rtp_packet_frame_.push_back((ssrc_ >> 24) & 0xFF); + rtp_packet_frame_.push_back((ssrc_ >> 16) & 0xFF); + rtp_packet_frame_.push_back((ssrc_ >> 8) & 0xFF); + rtp_packet_frame_.push_back(ssrc_ & 0xFF); - if (has_extension_) { - AddAbsSendTimeExtension(rtp_packet_frame_); - } + for (uint32_t index = 0; index < csrc_count_ && !csrcs_.empty(); index++) { + rtp_packet_frame_.push_back((csrcs_[index] >> 24) & 0xFF); + rtp_packet_frame_.push_back((csrcs_[index] >> 16) & 0xFF); + rtp_packet_frame_.push_back((csrcs_[index] >> 8) & 0xFF); + rtp_packet_frame_.push_back(csrcs_[index] & 0xFF); + } - // Add padding bytes - uint32_t padding_size = payload_size; - rtp_packet_frame_.insert(rtp_packet_frame_.end(), padding_size - 1, 0); - rtp_packet_frame_.push_back(padding_size); + if (has_extension_) { + AddAbsSendTimeExtension(rtp_packet_frame_); + } - if (use_rtp_packet_to_send) { - std::unique_ptr rtp_packet = - std::make_unique(); - rtp_packet->Build(rtp_packet_frame_.data(), rtp_packet_frame_.size()); - rtp_packets.emplace_back(std::move(rtp_packet)); - } else { - std::unique_ptr rtp_packet = std::make_unique(); - rtp_packet->Build(rtp_packet_frame_.data(), rtp_packet_frame_.size()); - rtp_packets.emplace_back(std::move(rtp_packet)); + // Add padding bytes + uint32_t padding_size = current_payload_size; + rtp_packet_frame_.insert(rtp_packet_frame_.end(), padding_size - 1, 0); + + if (use_rtp_packet_to_send) { + std::unique_ptr rtp_packet = + std::make_unique(); + rtp_packet->Build(rtp_packet_frame_.data(), rtp_packet_frame_.size()); + rtp_packets.emplace_back(std::move(rtp_packet)); + } else { + std::unique_ptr rtp_packet = std::make_unique(); + rtp_packet->Build(rtp_packet_frame_.data(), rtp_packet_frame_.size()); + rtp_packets.emplace_back(std::move(rtp_packet)); + } + + remaining_size -= current_payload_size; } return rtp_packets; diff --git a/src/transport/packet_sender.cpp b/src/transport/packet_sender.cpp index 8a4561e..9c2b6e9 100644 --- a/src/transport/packet_sender.cpp +++ b/src/transport/packet_sender.cpp @@ -25,11 +25,16 @@ PacketSender::GeneratePadding(webrtc::DataSize size) { std::vector> to_send_rtp_packets; std::vector> rtp_packets = generat_padding_func_(size.bytes(), clock_->CurrentTime().ms()); - // for (auto &packet : rtp_packets) { - // std::unique_ptr rtp_packet_to_send( - // static_cast(packet.release())); - // to_send_rtp_packets.push_back(std::move(rtp_packet_to_send)); - // } + for (auto &packet : rtp_packets) { + std::unique_ptr rtp_packet_to_send( + static_cast(packet.release())); + + rtp_packet_to_send->set_capture_time(clock_->CurrentTime()); + rtp_packet_to_send->set_transport_sequence_number((transport_seq_)++); + rtp_packet_to_send->set_packet_type(webrtc::RtpPacketMediaType::kPadding); + + to_send_rtp_packets.push_back(std::move(rtp_packet_to_send)); + } return to_send_rtp_packets; } @@ -67,23 +72,24 @@ void PacketSender::SetPacingRates(webrtc::DataRate pacing_rate, void PacketSender::EnqueuePackets( std::vector> packets) { - // task_queue_->PostTask() - for (auto &packet : packets) { - size_t packet_size = packet->payload_size() + packet->padding_size(); - if (include_overhead_) { - packet_size += packet->headers_size(); + task_queue_.PostTask([this, packets = std::move(packets)]() mutable { + for (auto &packet : packets) { + size_t packet_size = packet->payload_size() + packet->padding_size(); + if (include_overhead_) { + packet_size += packet->headers_size(); + } + packet_size_.Apply(1, packet_size); + pacing_controller_.EnqueuePacket(std::move(packet)); } - packet_size_.Apply(1, packet_size); - pacing_controller_.EnqueuePacket(std::move(packet)); - } - MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); + MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); + }); } void PacketSender::RemovePacketsForSsrc(uint32_t ssrc) { - // task_queue_->PostTask(SafeTask(safety_.flag(), [this, ssrc] { - pacing_controller_.RemovePacketsForSsrc(ssrc); - MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); - // })); + task_queue_.PostTask([this, ssrc] { + pacing_controller_.RemovePacketsForSsrc(ssrc); + MaybeProcessPackets(webrtc::Timestamp::MinusInfinity()); + }); } void PacketSender::SetAccountForAudioPackets(bool account_for_audio) { diff --git a/src/transport/packet_sender.h b/src/transport/packet_sender.h index 69761ba..a3da1d6 100644 --- a/src/transport/packet_sender.h +++ b/src/transport/packet_sender.h @@ -21,6 +21,7 @@ #include "rtc_base/numerics/exp_filter.h" #include "rtp_packet_pacer.h" #include "rtp_packet_to_send.h" +#include "task_queue.h" class PacketSender : public webrtc::RtpPacketPacer, public webrtc::PacingController::PacketSender { @@ -197,6 +198,9 @@ class PacketSender : public webrtc::RtpPacketPacer, Stats current_stats_; // Protects against ProcessPackets reentry from packet sent receipts. bool processing_packets_ = false; + + TaskQueue task_queue_; + int64_t transport_seq_ = 0; }; #endif \ No newline at end of file