[feat] use encode thread to encode frame

This commit is contained in:
dijunkun
2025-03-25 17:18:52 +08:00
parent 160ee9feef
commit bcf01791f7
42 changed files with 269 additions and 492 deletions

View File

@@ -7,26 +7,13 @@ RtpAudioReceiver::RtpAudioReceiver() {}
RtpAudioReceiver::RtpAudioReceiver(std::shared_ptr<IOStatistics> io_statistics)
: io_statistics_(io_statistics) {}
RtpAudioReceiver::~RtpAudioReceiver() {
if (rtp_statistics_) {
rtp_statistics_->Stop();
}
}
RtpAudioReceiver::~RtpAudioReceiver() {}
void RtpAudioReceiver::InsertRtpPacket(RtpPacket& rtp_packet) {
if (!rtp_statistics_) {
rtp_statistics_ = std::make_unique<RtpStatistics>();
rtp_statistics_->Start();
}
last_recv_bytes_ = (uint32_t)rtp_packet.Size();
total_rtp_payload_recv_ += (uint32_t)rtp_packet.PayloadSize();
total_rtp_packets_recv_++;
if (rtp_statistics_) {
rtp_statistics_->UpdateReceiveBytes(last_recv_bytes_);
}
if (io_statistics_) {
io_statistics_->UpdateAudioInboundBytes(last_recv_bytes_);
io_statistics_->IncrementAudioInboundRtpPacketCount();

View File

@@ -12,7 +12,6 @@
#include "io_statistics.h"
#include "receiver_report.h"
#include "rtp_packet.h"
#include "rtp_statistics.h"
#include "sender_report.h"
class RtpAudioReceiver {
@@ -44,7 +43,6 @@ class RtpAudioReceiver {
uint32_t last_complete_frame_ts_ = 0;
private:
std::unique_ptr<RtpStatistics> rtp_statistics_ = nullptr;
std::shared_ptr<IOStatistics> io_statistics_ = nullptr;
uint32_t last_recv_bytes_ = 0;
uint32_t total_rtp_payload_recv_ = 0;

View File

@@ -15,21 +15,10 @@ RtpAudioSender::RtpAudioSender(std::shared_ptr<IOStatistics> io_statistics)
SetThreadName("RtpAudioSender");
}
RtpAudioSender::~RtpAudioSender() {
if (rtp_statistics_) {
rtp_statistics_->Stop();
}
SSRCManager::Instance().DeleteSsrc(ssrc_);
}
RtpAudioSender::~RtpAudioSender() { SSRCManager::Instance().DeleteSsrc(ssrc_); }
void RtpAudioSender::Enqueue(
std::vector<std::unique_ptr<RtpPacket>>& rtp_packets) {
if (!rtp_statistics_) {
rtp_statistics_ = std::make_unique<RtpStatistics>();
rtp_statistics_->Start();
}
for (auto& rtp_packet : rtp_packets) {
rtp_packet_queue_.push(std::move(rtp_packet));
}
@@ -149,9 +138,5 @@ bool RtpAudioSender::Process() {
}
}
if (rtp_statistics_) {
rtp_statistics_->UpdateSentBytes(last_send_bytes_);
}
return true;
}

View File

@@ -13,7 +13,6 @@
#include "receiver_report.h"
#include "ringbuffer.h"
#include "rtp_packet.h"
#include "rtp_statistics.h"
#include "sender_report.h"
#include "thread_base.h"
@@ -44,7 +43,6 @@ class RtpAudioSender : public ThreadBase {
private:
uint32_t ssrc_ = 0;
std::unique_ptr<RtpStatistics> rtp_statistics_ = nullptr;
std::shared_ptr<IOStatistics> io_statistics_ = nullptr;
uint32_t last_send_bytes_ = 0;
uint32_t total_rtp_payload_sent_ = 0;

View File

@@ -7,26 +7,13 @@ RtpDataReceiver::RtpDataReceiver() {}
RtpDataReceiver::RtpDataReceiver(std::shared_ptr<IOStatistics> io_statistics)
: io_statistics_(io_statistics) {}
RtpDataReceiver::~RtpDataReceiver() {
if (rtp_statistics_) {
rtp_statistics_->Stop();
}
}
RtpDataReceiver::~RtpDataReceiver() {}
void RtpDataReceiver::InsertRtpPacket(RtpPacket& rtp_packet) {
if (!rtp_statistics_) {
rtp_statistics_ = std::make_unique<RtpStatistics>();
rtp_statistics_->Start();
}
last_recv_bytes_ = (uint32_t)rtp_packet.Size();
total_rtp_payload_recv_ += (uint32_t)rtp_packet.PayloadSize();
total_rtp_packets_recv_++;
if (rtp_statistics_) {
rtp_statistics_->UpdateReceiveBytes(last_recv_bytes_);
}
if (io_statistics_) {
io_statistics_->UpdateDataInboundBytes(last_recv_bytes_);
io_statistics_->IncrementDataInboundRtpPacketCount();

View File

@@ -6,7 +6,6 @@
#include "io_statistics.h"
#include "receiver_report.h"
#include "rtp_packet.h"
#include "rtp_statistics.h"
#include "sender_report.h"
class RtpDataReceiver {
public:
@@ -37,7 +36,6 @@ class RtpDataReceiver {
uint32_t last_complete_frame_ts_ = 0;
private:
std::unique_ptr<RtpStatistics> rtp_statistics_ = nullptr;
std::shared_ptr<IOStatistics> io_statistics_ = nullptr;
uint32_t last_recv_bytes_ = 0;
uint32_t total_rtp_payload_recv_ = 0;

View File

@@ -15,21 +15,10 @@ RtpDataSender::RtpDataSender(std::shared_ptr<IOStatistics> io_statistics)
SetThreadName("RtpDataSender");
}
RtpDataSender::~RtpDataSender() {
if (rtp_statistics_) {
rtp_statistics_->Stop();
}
SSRCManager::Instance().DeleteSsrc(ssrc_);
}
RtpDataSender::~RtpDataSender() { SSRCManager::Instance().DeleteSsrc(ssrc_); }
void RtpDataSender::Enqueue(
std::vector<std::unique_ptr<RtpPacket>>& rtp_packets) {
if (!rtp_statistics_) {
rtp_statistics_ = std::make_unique<RtpStatistics>();
rtp_statistics_->Start();
}
for (auto& rtp_packet : rtp_packets) {
rtp_packet_queue_.push(std::move(rtp_packet));
}
@@ -149,9 +138,5 @@ bool RtpDataSender::Process() {
}
}
if (rtp_statistics_) {
rtp_statistics_->UpdateSentBytes(last_send_bytes_);
}
return true;
}

View File

@@ -13,7 +13,6 @@
#include "receiver_report.h"
#include "ringbuffer.h"
#include "rtp_packet.h"
#include "rtp_statistics.h"
#include "sender_report.h"
#include "thread_base.h"
@@ -45,7 +44,6 @@ class RtpDataSender : public ThreadBase {
private:
uint32_t ssrc_ = 0;
std::unique_ptr<RtpStatistics> rtp_statistics_ = nullptr;
std::shared_ptr<IOStatistics> io_statistics_ = nullptr;
uint32_t last_send_bytes_ = 0;
uint32_t total_rtp_payload_sent_ = 0;

View File

@@ -77,10 +77,6 @@ RtpVideoReceiver::~RtpVideoReceiver() {
SSRCManager::Instance().DeleteSsrc(ssrc_);
if (rtp_statistics_) {
rtp_statistics_->Stop();
}
delete[] nv12_data_;
#ifdef SAVE_RTP_RECV_STREAM
@@ -93,11 +89,6 @@ RtpVideoReceiver::~RtpVideoReceiver() {
}
void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) {
if (!rtp_statistics_) {
rtp_statistics_ = std::make_unique<RtpStatistics>();
rtp_statistics_->Start();
}
webrtc::RtpPacketReceived rtp_packet_received;
rtp_packet_received.Build(rtp_packet.Buffer().data(), rtp_packet.Size());
rtp_packet_received.set_arrival_time(clock_->CurrentTime());
@@ -153,10 +144,6 @@ void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) {
total_rtp_payload_recv_ += (uint32_t)rtp_packet.PayloadSize();
total_rtp_packets_recv_++;
if (rtp_statistics_) {
rtp_statistics_->UpdateReceiveBytes(last_recv_bytes_);
}
if (io_statistics_) {
io_statistics_->UpdateVideoInboundBytes(last_recv_bytes_);
io_statistics_->IncrementVideoInboundRtpPacketCount();

View File

@@ -21,7 +21,6 @@
#include "rtp_packet_av1.h"
#include "rtp_packet_h264.h"
#include "rtp_rtcp_defines.h"
#include "rtp_statistics.h"
#include "sender_report.h"
#include "thread_base.h"
@@ -103,7 +102,6 @@ class RtpVideoReceiver : public ThreadBase,
RingBuffer<ReceivedFrame> compelete_video_frame_queue_;
private:
std::unique_ptr<RtpStatistics> rtp_statistics_ = nullptr;
std::shared_ptr<IOStatistics> io_statistics_ = nullptr;
uint32_t last_recv_bytes_ = 0;
uint32_t total_rtp_packets_recv_ = 0;

View File

@@ -28,10 +28,6 @@ RtpVideoSender::RtpVideoSender(std::shared_ptr<SystemClock> clock,
}
RtpVideoSender::~RtpVideoSender() {
if (rtp_statistics_) {
rtp_statistics_->Stop();
}
SSRCManager::Instance().DeleteSsrc(ssrc_);
#ifdef SAVE_RTP_SENT_STREAM
@@ -46,11 +42,6 @@ RtpVideoSender::~RtpVideoSender() {
void RtpVideoSender::Enqueue(
std::vector<std::unique_ptr<RtpPacket>>& rtp_packets,
int64_t captured_timestamp_us) {
if (!rtp_statistics_) {
rtp_statistics_ = std::make_unique<RtpStatistics>();
rtp_statistics_->Start();
}
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> to_send_rtp_packets;
for (auto& rtp_packet : rtp_packets) {
std::unique_ptr<webrtc::RtpPacketToSend> rtp_packet_to_send(
@@ -179,10 +170,6 @@ bool RtpVideoSender::Process() {
}
}
if (rtp_statistics_) {
rtp_statistics_->UpdateSentBytes(last_send_bytes_);
}
return true;
}

View File

@@ -10,7 +10,6 @@
#include "ringbuffer.h"
#include "rtp_packet.h"
#include "rtp_packet_to_send.h"
#include "rtp_statistics.h"
#include "sender_report.h"
#include "thread_base.h"
@@ -59,7 +58,6 @@ class RtpVideoSender : public ThreadBase {
private:
uint32_t ssrc_ = 0;
std::shared_ptr<webrtc::Clock> clock_ = nullptr;
std::unique_ptr<RtpStatistics> rtp_statistics_ = nullptr;
std::shared_ptr<IOStatistics> io_statistics_ = nullptr;
uint32_t last_send_bytes_ = 0;
uint32_t last_send_rtcp_sr_packet_ts_ = 0;

View File

@@ -105,14 +105,14 @@ void VideoChannelSend::Destroy() {
}
}
int VideoChannelSend::SendVideo(std::shared_ptr<EncodedFrame> encoded_frame) {
int VideoChannelSend::SendVideo(const EncodedFrame& encoded_frame) {
if (rtp_video_sender_ && rtp_packetizer_ && packet_sender_) {
int32_t rtp_timestamp =
delta_ntp_internal_ms_ +
static_cast<uint32_t>(encoded_frame->CapturedTimestamp() / 1000);
static_cast<uint32_t>(encoded_frame.CapturedTimestamp() / 1000);
std::vector<std::unique_ptr<RtpPacket>> rtp_packets =
rtp_packetizer_->Build((uint8_t*)encoded_frame->Buffer(),
(uint32_t)encoded_frame->Size(), rtp_timestamp,
rtp_packetizer_->Build((uint8_t*)encoded_frame.Buffer(),
(uint32_t)encoded_frame.Size(), rtp_timestamp,
true);
packet_sender_->EnqueueRtpPacket(std::move(rtp_packets), rtp_timestamp);
}

View File

@@ -53,7 +53,7 @@ class VideoChannelSend {
return 0;
}
int SendVideo(std::shared_ptr<EncodedFrame> encoded_frame);
int SendVideo(const EncodedFrame& encoded_frame);
void OnReceiverReport(const ReceiverReport& receiver_report) {
if (rtp_video_sender_) {

View File

@@ -55,10 +55,13 @@ void IceTransportController::Create(
CreateVideoCodec(clock_, video_codec_payload_type, hardware_acceleration);
CreateAudioCodec();
task_queue_ = std::make_shared<TaskQueue>();
task_queue_cc_ = std::make_shared<TaskQueue>("congest control");
task_queue_encode_ = std::make_shared<TaskQueue>("encode");
task_queue_decode_ = std::make_shared<TaskQueue>("decode");
controller_ = std::make_unique<CongestionControl>();
packet_sender_ =
std::make_shared<PacketSenderImp>(ice_agent, webrtc_clock_, task_queue_);
packet_sender_ = std::make_shared<PacketSenderImp>(ice_agent, webrtc_clock_,
task_queue_cc_);
packet_sender_->SetPacingRates(DataRate::BitsPerSec(300000),
DataRate::Zero());
packet_sender_->SetSendBurstInterval(TimeDelta::Millis(40));
@@ -201,8 +204,6 @@ int IceTransportController::SendVideo(const XVideoFrame* video_frame) {
b_force_i_frame_ = false;
}
bool need_to_release = false;
XVideoFrame new_frame;
new_frame.data = nullptr;
new_frame.width = video_frame->width;
@@ -215,30 +216,34 @@ int IceTransportController::SendVideo(const XVideoFrame* video_frame) {
resolution_adapter_->ResolutionDowngrade(
video_frame, target_width_.value(), target_height_.value(),
&new_frame);
need_to_release = true;
} else {
new_frame.data = new char[video_frame->size];
memcpy((void*)new_frame.data, (void*)video_frame->data,
video_frame->size);
}
}
int ret = video_encoder_->Encode(
need_to_release ? &new_frame : video_frame,
[this](std::shared_ptr<EncodedFrame> encoded_frame) -> int {
if (video_channel_send_) {
video_channel_send_->SendVideo(encoded_frame);
}
RawFrame raw_frame((const uint8_t*)new_frame.data, new_frame.size,
new_frame.width, new_frame.height);
raw_frame.SetCapturedTimestamp(video_frame->captured_timestamp);
return 0;
});
delete[] new_frame.data;
if (need_to_release) {
delete[] new_frame.data;
if (task_queue_encode_ && video_encoder_) {
task_queue_encode_->PostTask([this, raw_frame]() mutable {
int ret = video_encoder_->Encode(
std::move(raw_frame),
[this](const EncodedFrame& encoded_frame) -> int {
if (video_channel_send_) {
video_channel_send_->SendVideo(encoded_frame);
}
return 0;
});
});
}
if (0 != ret) {
LOG_ERROR("Encode failed");
return -1;
} else {
return 0;
}
return 0;
}
int IceTransportController::SendAudio(const char* data, size_t size) {
@@ -506,8 +511,8 @@ void IceTransportController::OnReceiverReport(
msg.start_time = last_report_block_time_;
msg.end_time = now;
if (task_queue_) {
task_queue_->PostTask([this, msg]() mutable {
if (task_queue_cc_) {
task_queue_cc_->PostTask([this, msg]() mutable {
if (controller_) {
PostUpdates(controller_->OnTransportLossReport(msg));
}
@@ -522,8 +527,8 @@ void IceTransportController::OnCongestionControlFeedback(
std::optional<webrtc::TransportPacketsFeedback> feedback_msg =
transport_feedback_adapter_.ProcessCongestionControlFeedback(
feedback, Timestamp::Micros(clock_->CurrentTimeUs()));
if (feedback_msg.has_value() && task_queue_) {
task_queue_->PostTask([this, feedback_msg]() mutable {
if (feedback_msg.has_value() && task_queue_cc_) {
task_queue_cc_->PostTask([this, feedback_msg]() mutable {
if (controller_) {
PostUpdates(
controller_->OnTransportPacketsFeedback(feedback_msg.value()));
@@ -633,8 +638,8 @@ bool IceTransportController::Process() {
return false;
}
if (task_queue_ && controller_) {
task_queue_->PostTask([this]() mutable {
if (task_queue_cc_ && controller_) {
task_queue_cc_->PostTask([this]() mutable {
webrtc::ProcessInterval msg;
msg.at_time = Timestamp::Millis(webrtc_clock_->TimeInMilliseconds());
PostUpdates(controller_->OnProcessInterval(msg));

View File

@@ -120,7 +120,9 @@ class IceTransportController
webrtc::TransportFeedbackAdapter transport_feedback_adapter_;
std::unique_ptr<CongestionControl> controller_;
BitrateProber prober_;
std::shared_ptr<TaskQueue> task_queue_;
std::shared_ptr<TaskQueue> task_queue_cc_;
std::shared_ptr<TaskQueue> task_queue_encode_;
std::shared_ptr<TaskQueue> task_queue_decode_;
webrtc::DataSize congestion_window_size_;
bool is_congested_ = false;