[fix] fix pacer module crash due to multi-thread

This commit is contained in:
dijunkun
2025-03-21 16:49:43 +08:00
parent 1d41b6499f
commit d17b29dfa4
20 changed files with 131 additions and 83 deletions

View File

@@ -11,7 +11,8 @@
#define NV12_BUFFER_SIZE (1280 * 720 * 3 / 2)
#define RTCP_RR_INTERVAL 1000
#define MAX_WAIT_TIME_MS 20 // 20ms
#define MAX_WAIT_TIME_MS 20 // 20ms
#define NACK_UPDATE_INTERVAL 20 // 20ms
RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr<SystemClock> clock)
: ssrc_(GenerateUniqueSsrc()),
@@ -148,11 +149,6 @@ void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) {
file_rtp_recv_);
#endif
receive_side_congestion_controller_.OnReceivedPacket(rtp_packet_received,
MediaType::VIDEO);
nack_->OnReceivedPacket(rtp_packet.SequenceNumber());
last_recv_bytes_ = (uint32_t)rtp_packet.PayloadSize();
total_rtp_payload_recv_ += (uint32_t)rtp_packet.PayloadSize();
total_rtp_packets_recv_++;
@@ -167,6 +163,13 @@ void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) {
io_statistics_->UpdateVideoPacketLossCount(rtp_packet.SequenceNumber());
}
uint32_t now_ts = static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count());
CheckIsTimeUpdateNack(now_ts);
// if (CheckIsTimeSendRR()) {
// ReceiverReport rtcp_rr;
// RtcpReportBlock report;
@@ -208,14 +211,20 @@ void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) {
RtpPacketH264 rtp_packet_h264;
if (rtp_packet_h264.Build(rtp_packet.Buffer().data(), rtp_packet.Size())) {
rtp_packet_h264.GetFrameHeaderInfo();
ProcessH264RtpPacket(rtp_packet_h264);
} else {
LOG_ERROR("Invalid h264 rtp packet");
bool is_missing_packet = ProcessH264RtpPacket(rtp_packet_h264);
if (!is_missing_packet) {
receive_side_congestion_controller_.OnReceivedPacket(
rtp_packet_received, MediaType::VIDEO);
nack_->OnReceivedPacket(rtp_packet.SequenceNumber(), true);
} else {
nack_->OnReceivedPacket(rtp_packet.SequenceNumber(), false);
}
}
}
}
void RtpVideoReceiver::ProcessH264RtpPacket(RtpPacketH264& rtp_packet_h264) {
bool RtpVideoReceiver::ProcessH264RtpPacket(RtpPacketH264& rtp_packet_h264) {
bool is_missing_packet = false;
if (!fec_enable_) {
if (rtp::PAYLOAD_TYPE::H264 == rtp_packet_h264.PayloadType()) {
rtp::NAL_UNIT_TYPE nalu_type = rtp_packet_h264.NalUnitType();
@@ -232,21 +241,31 @@ void RtpVideoReceiver::ProcessH264RtpPacket(RtpPacketH264& rtp_packet_h264) {
} else if (rtp::NAL_UNIT_TYPE::FU_A == nalu_type) {
incomplete_h264_frame_list_[rtp_packet_h264.SequenceNumber()] =
rtp_packet_h264;
if (incomplete_h264_frame_list_.find(
rtp_packet_h264.SequenceNumber()) ==
incomplete_h264_frame_list_.end()) {
LOG_ERROR("missing seq {}", rtp_packet_h264.SequenceNumber());
}
if (rtp_packet_h264.FuAEnd()) {
CheckIsH264FrameCompletedFuaEndReceived(rtp_packet_h264);
} else {
auto missing_seqs_iter =
missing_sequence_numbers_.find(rtp_packet_h264.Timestamp());
auto missing_seqs_wait_ts_iter =
missing_sequence_numbers_wait_time_.find(
rtp_packet_h264.Timestamp());
if (missing_seqs_iter != missing_sequence_numbers_.end()) {
auto missing_seqs = missing_seqs_iter->second;
if (missing_seqs.find(rtp_packet_h264.SequenceNumber()) !=
missing_seqs.end()) {
CheckIsH264FrameCompletedMissSeqReceived(rtp_packet_h264);
if (missing_seqs_wait_ts_iter !=
missing_sequence_numbers_wait_time_.end()) {
if (clock_->CurrentTime().ms() -
missing_seqs_wait_ts_iter->second <=
MAX_WAIT_TIME_MS) {
auto missing_seqs = missing_seqs_iter->second;
if (missing_seqs.find(rtp_packet_h264.SequenceNumber()) !=
missing_seqs.end()) {
CheckIsH264FrameCompletedMissSeqReceived(rtp_packet_h264);
is_missing_packet = true;
}
} else {
missing_sequence_numbers_wait_time_.erase(
missing_seqs_wait_ts_iter);
missing_sequence_numbers_.erase(missing_seqs_iter);
}
}
}
}
@@ -359,6 +378,8 @@ void RtpVideoReceiver::ProcessH264RtpPacket(RtpPacketH264& rtp_packet_h264) {
// }
// }
// }
return is_missing_packet;
}
void RtpVideoReceiver::ProcessAv1RtpPacket(RtpPacketAv1& rtp_packet_av1) {
@@ -384,7 +405,7 @@ void RtpVideoReceiver::ProcessAv1RtpPacket(RtpPacketAv1& rtp_packet_av1) {
bool RtpVideoReceiver::CheckIsH264FrameCompletedFuaEndReceived(
RtpPacketH264& rtp_packet_h264) {
uint64_t timestamp = rtp_packet_h264.Timestamp();
uint32_t timestamp = rtp_packet_h264.Timestamp();
uint16_t end_seq = rtp_packet_h264.SequenceNumber();
fua_end_sequence_numbers_[timestamp] = end_seq;
uint16_t start_seq = 0;
@@ -427,7 +448,7 @@ bool RtpVideoReceiver::CheckIsH264FrameCompletedMissSeqReceived(
return false;
}
uint64_t timestamp = rtp_packet_h264.Timestamp();
uint32_t timestamp = rtp_packet_h264.Timestamp();
uint16_t end_seq = fua_end_sequence_numbers_[timestamp];
uint16_t start_seq = 0;
bool has_start = false;
@@ -468,7 +489,7 @@ bool RtpVideoReceiver::CheckIsH264FrameCompletedMissSeqReceived(
}
bool RtpVideoReceiver::PopCompleteFrame(uint16_t start_seq, uint16_t end_seq,
uint64_t timestamp) {
uint32_t timestamp) {
size_t complete_frame_size = 0;
int frame_fragment_count = 0;
@@ -630,20 +651,24 @@ void RtpVideoReceiver::SendRemb(int64_t bitrate_bps,
active_remb_module_->SetRemb(bitrate_bps, std::move(ssrcs));
}
bool RtpVideoReceiver::CheckIsTimeSendRR() {
uint32_t now_ts = static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count());
if (now_ts - last_send_rtcp_rr_packet_ts_ >= RTCP_RR_INTERVAL) {
last_send_rtcp_rr_packet_ts_ = now_ts;
bool RtpVideoReceiver::CheckIsTimeSendRR(uint32_t now) {
if (now - last_send_rtcp_rr_packet_ts_ >= RTCP_RR_INTERVAL) {
last_send_rtcp_rr_packet_ts_ = now;
return true;
} else {
return false;
}
}
void RtpVideoReceiver::CheckIsTimeUpdateNack(uint32_t now) {
if (now - last_nack_update_ts_ >= NACK_UPDATE_INTERVAL) {
last_send_rtcp_rr_packet_ts_ = now;
if (nack_) {
nack_->ProcessNacks();
}
}
}
bool RtpVideoReceiver::Process() {
if (!compelete_video_frame_queue_.isEmpty()) {
std::optional<ReceivedFrame> video_frame =

View File

@@ -55,14 +55,15 @@ class RtpVideoReceiver : public ThreadBase,
bool CheckIsAv1FrameCompleted(RtpPacketAv1& rtp_packet_av1);
private:
void ProcessH264RtpPacket(RtpPacketH264& rtp_packet_h264);
bool ProcessH264RtpPacket(RtpPacketH264& rtp_packet_h264);
bool CheckIsH264FrameCompletedFuaEndReceived(RtpPacketH264& rtp_packet_h264);
bool CheckIsH264FrameCompletedMissSeqReceived(RtpPacketH264& rtp_packet_h264);
bool PopCompleteFrame(uint16_t start_seq, uint16_t end_seq,
uint64_t timestamp);
uint32_t timestamp);
private:
bool CheckIsTimeSendRR();
bool CheckIsTimeSendRR(uint32_t now);
void CheckIsTimeUpdateNack(uint32_t now);
int SendRtcpRR(ReceiverReport& rtcp_rr);
void SendCombinedRtcpPacket(
@@ -106,6 +107,7 @@ class RtpVideoReceiver : public ThreadBase,
uint32_t total_rtp_payload_recv_ = 0;
uint32_t last_send_rtcp_rr_packet_ts_ = 0;
uint32_t last_nack_update_ts_ = 0;
std::function<int(const char*, size_t)> data_send_func_ = nullptr;
private:

View File

@@ -107,7 +107,7 @@ void VideoChannelSend::Destroy() {
int VideoChannelSend::SendVideo(std::shared_ptr<EncodedFrame> encoded_frame) {
if (rtp_video_sender_ && rtp_packetizer_) {
int64_t rtp_timestamp =
int32_t rtp_timestamp =
delta_ntp_internal_ms_ +
static_cast<uint32_t>(encoded_frame->CapturedTimestamp() / 1000);
std::vector<std::unique_ptr<RtpPacket>> rtp_packets =

View File

@@ -53,8 +53,10 @@ void IceTransportController::Create(
CreateVideoCodec(clock_, video_codec_payload_type, hardware_acceleration);
CreateAudioCodec();
task_queue_ = std::make_shared<TaskQueue>();
controller_ = std::make_unique<CongestionControl>();
packet_sender_ = std::make_shared<PacketSenderImp>(ice_agent, webrtc_clock_);
packet_sender_ =
std::make_shared<PacketSenderImp>(ice_agent, webrtc_clock_, task_queue_);
packet_sender_->SetPacingRates(DataRate::BitsPerSec(300000),
DataRate::Zero());
packet_sender_->SetSendBurstInterval(TimeDelta::Millis(40));
@@ -481,9 +483,13 @@ void IceTransportController::OnReceiverReport(
msg.receive_time = now;
msg.start_time = last_report_block_time_;
msg.end_time = now;
if (controller_) {
PostUpdates(controller_->OnTransportLossReport(msg));
}
task_queue_->PostTask([this, msg]() mutable {
if (controller_) {
PostUpdates(controller_->OnTransportLossReport(msg));
}
});
last_report_block_time_ = now;
}
@@ -492,19 +498,18 @@ void IceTransportController::OnCongestionControlFeedback(
std::optional<webrtc::TransportPacketsFeedback> feedback_msg =
transport_feedback_adapter_.ProcessCongestionControlFeedback(
feedback, Timestamp::Micros(clock_->CurrentTimeUs()));
if (feedback_msg) {
HandleTransportPacketsFeedback(*feedback_msg);
if (feedback_msg.has_value()) {
task_queue_->PostTask([this, feedback_msg]() mutable {
if (controller_) {
PostUpdates(
controller_->OnTransportPacketsFeedback(feedback_msg.value()));
}
});
UpdateCongestedState();
}
}
void IceTransportController::HandleTransportPacketsFeedback(
const webrtc::TransportPacketsFeedback& feedback) {
if (controller_)
PostUpdates(controller_->OnTransportPacketsFeedback(feedback));
UpdateCongestedState();
}
void IceTransportController::OnReceiveNack(
const std::vector<uint16_t>& nack_sequence_numbers) {
if (video_channel_send_) {
@@ -512,12 +517,6 @@ void IceTransportController::OnReceiveNack(
}
}
void IceTransportController::UpdateControllerWithTimeInterval() {
ProcessInterval msg;
msg.at_time = Timestamp::Millis(webrtc_clock_->TimeInMilliseconds());
PostUpdates(controller_->OnProcessInterval(msg));
}
void IceTransportController::OnSentRtpPacket(
const webrtc::RtpPacketToSend& packet) {
webrtc::PacedPacketInfo pacing_info;
@@ -604,8 +603,11 @@ std::optional<bool> IceTransportController::GetCongestedStateUpdate() const {
}
bool IceTransportController::Process() {
webrtc::ProcessInterval msg;
msg.at_time = Timestamp::Millis(webrtc_clock_->TimeInMilliseconds());
PostUpdates(controller_->OnProcessInterval(msg));
task_queue_->PostTask([this]() mutable {
webrtc::ProcessInterval msg;
msg.at_time = Timestamp::Millis(webrtc_clock_->TimeInMilliseconds());
PostUpdates(controller_->OnProcessInterval(msg));
});
return true;
}

View File

@@ -24,6 +24,7 @@
#include "packet_sender.h"
#include "packet_sender_imp.h"
#include "resolution_adapter.h"
#include "task_queue.h"
#include "transport_feedback_adapter.h"
#include "video_channel_receive.h"
#include "video_channel_send.h"
@@ -82,10 +83,7 @@ class IceTransportController
int CreateAudioCodec();
private:
void UpdateControllerWithTimeInterval();
void OnSentRtpPacket(const webrtc::RtpPacketToSend &packet);
void HandleTransportPacketsFeedback(
const webrtc::TransportPacketsFeedback &feedback);
void PostUpdates(webrtc::NetworkControlUpdate update);
void UpdateControlState();
void UpdateCongestedState();
@@ -121,6 +119,7 @@ class IceTransportController
webrtc::TransportFeedbackAdapter transport_feedback_adapter_;
std::unique_ptr<CongestionControl> controller_;
BitrateProber prober_;
std::shared_ptr<TaskQueue> task_queue_;
webrtc::DataSize congestion_window_size_;
bool is_congested_ = false;

View File

@@ -6,7 +6,8 @@
const int PacketSenderImp::kNoPacketHoldback = -1;
PacketSenderImp::PacketSenderImp(std::shared_ptr<IceAgent> ice_agent,
std::shared_ptr<webrtc::Clock> clock)
std::shared_ptr<webrtc::Clock> clock,
std::shared_ptr<TaskQueue> task_queue)
: ice_agent_(ice_agent),
clock_(clock),
pacing_controller_(clock.get(), this),
@@ -18,7 +19,8 @@ PacketSenderImp::PacketSenderImp(std::shared_ptr<IceAgent> ice_agent,
packet_size_(/*alpha=*/0.95),
include_overhead_(false),
last_send_time_(webrtc::Timestamp::Millis(0)),
last_call_time_(webrtc::Timestamp::Millis(0)) {}
last_call_time_(webrtc::Timestamp::Millis(0)),
task_queue_(task_queue) {}
PacketSenderImp::~PacketSenderImp() {}
@@ -80,7 +82,7 @@ void PacketSenderImp::SetPacingRates(webrtc::DataRate pacing_rate,
void PacketSenderImp::EnqueuePackets(
std::vector<std::unique_ptr<webrtc::RtpPacketToSend>> packets) {
task_queue_.PostTask([this, packets = std::move(packets)]() mutable {
task_queue_->PostTask([this, packets = std::move(packets)]() mutable {
for (auto &packet : packets) {
size_t packet_size = packet->payload_size() + packet->padding_size();
if (include_overhead_) {
@@ -94,7 +96,7 @@ void PacketSenderImp::EnqueuePackets(
}
void PacketSenderImp::RemovePacketsForSsrc(uint32_t ssrc) {
task_queue_.PostTask([this, ssrc] {
task_queue_->PostTask([this, ssrc] {
pacing_controller_.RemovePacketsForSsrc(ssrc);
MaybeProcessPackets(webrtc::Timestamp::MinusInfinity());
});
@@ -226,7 +228,7 @@ void PacketSenderImp::MaybeProcessPackets(
if (next_process_time_.IsMinusInfinity() ||
next_process_time_ > next_send_time) {
// Prefer low precision if allowed and not probing.
task_queue_.PostDelayedTask(
task_queue_->PostDelayedTask(
[this, next_send_time]() { MaybeProcessPackets(next_send_time); },
time_to_next_process.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms());
next_process_time_ = next_send_time;
@@ -281,7 +283,6 @@ int PacketSenderImp::EnqueueRtpPacket(
rtp_packet_to_send->set_packet_type(webrtc::RtpPacketMediaType::kVideo);
break;
}
// webrtc::PacedPacketInfo cluster_info;
// SendPacket(std::move(rtp_packet_to_send), cluster_info);

View File

@@ -31,7 +31,8 @@ class PacketSenderImp : public PacketSender,
static const int kNoPacketHoldback;
PacketSenderImp(std::shared_ptr<IceAgent> ice_agent,
std::shared_ptr<webrtc::Clock> clock);
std::shared_ptr<webrtc::Clock> clock,
std::shared_ptr<TaskQueue> task_queue);
~PacketSenderImp();
public:
@@ -221,7 +222,7 @@ class PacketSenderImp : public PacketSender,
// Protects against ProcessPackets reentry from packet sent receipts.
bool processing_packets_ = false;
TaskQueue task_queue_;
std::shared_ptr<TaskQueue> task_queue_;
int64_t transport_seq_ = 0;
std::map<int32_t, int16_t> ssrc_seq_;